Browse Source

为WebSocket相关文件添加详细的中文注释

test
hyh 3 months ago
parent
commit
f57dc0e001
  1. 66
      src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs
  2. 97
      src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs
  3. 31
      src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs
  4. 49
      src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs
  5. 151
      src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs

66
src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs

@ -5,55 +5,117 @@ using CellularManagement.Application.Services;
namespace CellularManagement.Infrastructure.WebSocket; namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket连接管理器
/// 负责管理WebSocket连接的创建、删除和查询
/// 使用线程安全的并发集合存储连接信息
/// </summary>
public class WebSocketConnectionManager public class WebSocketConnectionManager
{ {
/// <summary>
/// 日志记录器,用于记录连接管理器的操作日志
/// </summary>
private readonly ILogger<WebSocketConnectionManager> _logger; private readonly ILogger<WebSocketConnectionManager> _logger;
/// <summary>
/// WebSocket连接字典
/// 键为连接ID,值为WebSocket实例
/// 使用线程安全的ConcurrentDictionary
/// </summary>
private readonly ConcurrentDictionary<string, System.Net.WebSockets.WebSocket> _sockets = new(); private readonly ConcurrentDictionary<string, System.Net.WebSockets.WebSocket> _sockets = new();
/// <summary>
/// 用户连接关联字典
/// 键为连接ID,值为用户ID
/// 使用线程安全的ConcurrentDictionary
/// </summary>
private readonly ConcurrentDictionary<string, string> _userConnections = new(); private readonly ConcurrentDictionary<string, string> _userConnections = new();
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器,用于记录连接管理器的操作日志</param>
public WebSocketConnectionManager(ILogger<WebSocketConnectionManager> logger) public WebSocketConnectionManager(ILogger<WebSocketConnectionManager> logger)
{ {
_logger = logger; _logger = logger;
} }
/// <summary>
/// 添加新的WebSocket连接
/// 生成唯一的连接ID并存储连接信息
/// </summary>
/// <param name="socket">要添加的WebSocket实例</param>
/// <returns>新创建的连接ID</returns>
public string AddConnection(System.Net.WebSockets.WebSocket socket) public string AddConnection(System.Net.WebSockets.WebSocket socket)
{ {
var connectionId = Guid.NewGuid().ToString(); var connectionId = Guid.NewGuid().ToString();
_sockets.TryAdd(connectionId, socket); _sockets.TryAdd(connectionId, socket);
_logger.LogInformation("WebSocket connection added: {ConnectionId}", connectionId); _logger.LogInformation("WebSocket连接已添加: {ConnectionId}", connectionId);
return connectionId; return connectionId;
} }
/// <summary>
/// 移除WebSocket连接
/// 清理连接相关的所有信息
/// </summary>
/// <param name="connectionId">要移除的连接ID</param>
/// <returns>是否成功移除连接</returns>
public bool RemoveConnection(string connectionId) public bool RemoveConnection(string connectionId)
{ {
_sockets.TryRemove(connectionId, out _); _sockets.TryRemove(connectionId, out _);
_userConnections.TryRemove(connectionId, out _); _userConnections.TryRemove(connectionId, out _);
_logger.LogInformation("WebSocket connection removed: {ConnectionId}", connectionId); _logger.LogInformation("WebSocket连接已移除: {ConnectionId}", connectionId);
return true; return true;
} }
/// <summary>
/// 获取指定ID的WebSocket连接
/// </summary>
/// <param name="connectionId">要查询的连接ID</param>
/// <returns>WebSocket实例,如果不存在则返回null</returns>
public System.Net.WebSockets.WebSocket? GetConnection(string connectionId) public System.Net.WebSockets.WebSocket? GetConnection(string connectionId)
{ {
_sockets.TryGetValue(connectionId, out var socket); _sockets.TryGetValue(connectionId, out var socket);
return socket; return socket;
} }
/// <summary>
/// 获取所有WebSocket连接
/// </summary>
/// <returns>所有WebSocket实例的集合</returns>
public IEnumerable<System.Net.WebSockets.WebSocket> GetAllConnections() public IEnumerable<System.Net.WebSockets.WebSocket> GetAllConnections()
{ {
return _sockets.Values; return _sockets.Values;
} }
/// <summary>
/// 将连接与用户关联
/// 建立连接ID和用户ID的映射关系
/// </summary>
/// <param name="connectionId">要关联的连接ID</param>
/// <param name="userId">要关联的用户ID</param>
public void AssociateUser(string connectionId, string userId) public void AssociateUser(string connectionId, string userId)
{ {
_userConnections.TryAdd(connectionId, userId); _userConnections.TryAdd(connectionId, userId);
} }
/// <summary>
/// 获取连接关联的用户ID
/// </summary>
/// <param name="connectionId">要查询的连接ID</param>
/// <returns>用户ID,如果未关联则返回null</returns>
public string? GetUserId(string connectionId) public string? GetUserId(string connectionId)
{ {
_userConnections.TryGetValue(connectionId, out var userId); _userConnections.TryGetValue(connectionId, out var userId);
return userId; return userId;
} }
/// <summary>
/// 获取用户的所有WebSocket连接
/// 根据用户ID查找所有关联的连接
/// </summary>
/// <param name="userId">要查询的用户ID</param>
/// <returns>用户的所有WebSocket实例的集合</returns>
public IEnumerable<System.Net.WebSockets.WebSocket> GetUserConnections(string userId) public IEnumerable<System.Net.WebSockets.WebSocket> GetUserConnections(string userId)
{ {
var connections = _userConnections var connections = _userConnections

97
src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs

@ -4,14 +4,45 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.Infrastructure.WebSocket; namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket消息处理管道
/// 负责批量处理和分发WebSocket消息
/// 使用Channel实现消息队列,支持批量处理提高性能
/// </summary>
public class WebSocketMessagePipeline public class WebSocketMessagePipeline
{ {
/// <summary>
/// 消息通道,用于存储待处理的消息
/// 使用有界通道限制内存使用
/// </summary>
private readonly Channel<WebSocketMessage> _messageChannel; private readonly Channel<WebSocketMessage> _messageChannel;
/// <summary>
/// 日志记录器,用于记录管道的操作日志
/// </summary>
private readonly ILogger<WebSocketMessagePipeline> _logger; private readonly ILogger<WebSocketMessagePipeline> _logger;
/// <summary>
/// 内存池,用于复用内存缓冲区
/// </summary>
private readonly MemoryPool<byte> _memoryPool; private readonly MemoryPool<byte> _memoryPool;
/// <summary>
/// 批处理大小,每次处理的消息数量
/// </summary>
private readonly int _batchSize; private readonly int _batchSize;
/// <summary>
/// 最大队列大小,限制通道中可存储的消息数量
/// </summary>
private readonly int _maxQueueSize; private readonly int _maxQueueSize;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器,用于记录管道的操作日志</param>
/// <param name="batchSize">批处理大小,每次处理的消息数量,默认100</param>
/// <param name="maxQueueSize">最大队列大小,限制通道中可存储的消息数量,默认10000</param>
public WebSocketMessagePipeline( public WebSocketMessagePipeline(
ILogger<WebSocketMessagePipeline> logger, ILogger<WebSocketMessagePipeline> logger,
int batchSize = 100, int batchSize = 100,
@ -22,16 +53,23 @@ public class WebSocketMessagePipeline
_maxQueueSize = maxQueueSize; _maxQueueSize = maxQueueSize;
_memoryPool = MemoryPool<byte>.Shared; _memoryPool = MemoryPool<byte>.Shared;
// 创建有界通道
var options = new BoundedChannelOptions(maxQueueSize) var options = new BoundedChannelOptions(maxQueueSize)
{ {
FullMode = BoundedChannelFullMode.Wait, FullMode = BoundedChannelFullMode.Wait, // 队列满时等待
SingleWriter = false, SingleWriter = false, // 允许多个写入者
SingleReader = false SingleReader = false // 允许多个读取者
}; };
_messageChannel = Channel.CreateBounded<WebSocketMessage>(options); _messageChannel = Channel.CreateBounded<WebSocketMessage>(options);
} }
/// <summary>
/// 写入消息到管道
/// 将消息添加到通道中等待处理
/// </summary>
/// <param name="message">要处理的WebSocket消息</param>
/// <returns>是否成功写入消息</returns>
public async ValueTask<bool> WriteMessageAsync(WebSocketMessage message) public async ValueTask<bool> WriteMessageAsync(WebSocketMessage message)
{ {
try try
@ -41,11 +79,16 @@ public class WebSocketMessagePipeline
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Failed to write message to pipeline"); _logger.LogError(ex, "写入消息到管道失败");
return false; return false;
} }
} }
/// <summary>
/// 处理消息
/// 循环从通道中读取消息并批量处理
/// </summary>
/// <param name="cancellationToken">取消令牌,用于停止处理</param>
public async Task ProcessMessagesAsync(CancellationToken cancellationToken) public async Task ProcessMessagesAsync(CancellationToken cancellationToken)
{ {
var batch = new List<WebSocketMessage>(_batchSize); var batch = new List<WebSocketMessage>(_batchSize);
@ -74,14 +117,20 @@ public class WebSocketMessagePipeline
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
_logger.LogInformation("Message processing pipeline stopped"); _logger.LogInformation("消息处理管道已停止");
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error in message processing pipeline"); _logger.LogError(ex, "消息处理管道出错");
} }
} }
/// <summary>
/// 批量处理消息
/// 处理一批消息,逐个处理并记录错误
/// </summary>
/// <param name="batch">要处理的消息批次</param>
/// <param name="cancellationToken">取消令牌,用于停止处理</param>
private async Task ProcessBatchAsync(List<WebSocketMessage> batch, CancellationToken cancellationToken) private async Task ProcessBatchAsync(List<WebSocketMessage> batch, CancellationToken cancellationToken)
{ {
// 这里实现具体的消息处理逻辑 // 这里实现具体的消息处理逻辑
@ -94,11 +143,17 @@ public class WebSocketMessagePipeline
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error processing message: {MessageId}", message.MessageId); _logger.LogError(ex, "处理消息时出错: {MessageId}", message.MessageId);
} }
} }
} }
/// <summary>
/// 处理单个消息
/// 实现具体的消息处理逻辑
/// </summary>
/// <param name="message">要处理的WebSocket消息</param>
/// <param name="cancellationToken">取消令牌,用于停止处理</param>
private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken) private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken)
{ {
// 实现具体的消息处理逻辑 // 实现具体的消息处理逻辑
@ -106,11 +161,39 @@ public class WebSocketMessagePipeline
} }
} }
/// <summary>
/// WebSocket消息实体
/// 表示一个WebSocket消息的基本信息
/// </summary>
public class WebSocketMessage public class WebSocketMessage
{ {
/// <summary>
/// 消息ID
/// 唯一标识一条消息
/// </summary>
public string MessageId { get; set; } = Guid.NewGuid().ToString(); public string MessageId { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// 连接ID
/// 标识消息来自哪个WebSocket连接
/// </summary>
public string ConnectionId { get; set; } = string.Empty; public string ConnectionId { get; set; } = string.Empty;
/// <summary>
/// 消息类型
/// 用于标识消息的类型,可用于路由和处理
/// </summary>
public string MessageType { get; set; } = string.Empty; public string MessageType { get; set; } = string.Empty;
/// <summary>
/// 消息内容
/// 消息的实际数据
/// </summary>
public byte[] Payload { get; set; } = Array.Empty<byte>(); public byte[] Payload { get; set; } = Array.Empty<byte>();
/// <summary>
/// 时间戳
/// 消息创建的时间
/// </summary>
public DateTime Timestamp { get; set; } = DateTime.UtcNow; public DateTime Timestamp { get; set; } = DateTime.UtcNow;
} }

31
src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs

@ -3,28 +3,59 @@ using CellularManagement.Application.Pipeline;
namespace CellularManagement.Infrastructure.WebSocket; namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket消息池
/// 负责管理WebSocket消息上下文的复用,减少内存分配
/// 使用对象池模式提高性能
/// </summary>
public class WebSocketMessagePool public class WebSocketMessagePool
{ {
/// <summary>
/// 消息上下文池
/// 使用线程安全的ConcurrentBag存储可复用的消息上下文
/// </summary>
private readonly ConcurrentBag<WebSocketMessageContext> _pool = new(); private readonly ConcurrentBag<WebSocketMessageContext> _pool = new();
/// <summary>
/// 最大池大小
/// 限制池中可存储的消息上下文数量
/// </summary>
private readonly int _maxPoolSize; private readonly int _maxPoolSize;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="maxPoolSize">最大池大小,限制池中可存储的消息上下文数量,默认1000</param>
public WebSocketMessagePool(int maxPoolSize = 1000) public WebSocketMessagePool(int maxPoolSize = 1000)
{ {
_maxPoolSize = maxPoolSize; _maxPoolSize = maxPoolSize;
} }
/// <summary>
/// 从池中获取消息上下文
/// 如果池中有可用的上下文则复用,否则创建新的
/// </summary>
/// <returns>可用的消息上下文</returns>
public WebSocketMessageContext Rent() public WebSocketMessageContext Rent()
{ {
// 尝试从池中获取可用的上下文
if (_pool.TryTake(out var context)) if (_pool.TryTake(out var context))
{ {
return context; return context;
} }
// 如果池中没有可用的上下文,创建新的
return new WebSocketMessageContext(); return new WebSocketMessageContext();
} }
/// <summary>
/// 将消息上下文返回到池中
/// 重置上下文状态并放回池中,如果池已满则丢弃
/// </summary>
/// <param name="context">要返回的消息上下文</param>
public void Return(WebSocketMessageContext context) public void Return(WebSocketMessageContext context)
{ {
// 如果池未满,重置上下文并放回池中
if (_pool.Count < _maxPoolSize) if (_pool.Count < _maxPoolSize)
{ {
// 重置上下文状态 // 重置上下文状态

49
src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs

@ -7,12 +7,34 @@ using CellularManagement.Application.Services;
namespace CellularManagement.Infrastructure.WebSocket; namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket中间件
/// 负责处理WebSocket请求的生命周期和消息处理
/// 作为ASP.NET Core中间件,拦截WebSocket请求并管理连接
/// </summary>
public class WebSocketMiddleware public class WebSocketMiddleware
{ {
/// <summary>
/// 请求委托,用于继续处理管道中的下一个中间件
/// </summary>
private readonly RequestDelegate _next; private readonly RequestDelegate _next;
/// <summary>
/// 日志记录器,用于记录中间件的操作日志
/// </summary>
private readonly ILogger<WebSocketMiddleware> _logger; private readonly ILogger<WebSocketMiddleware> _logger;
/// <summary>
/// 服务提供者,用于创建服务作用域和获取所需服务
/// </summary>
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="next">请求委托,用于继续处理管道中的下一个中间件</param>
/// <param name="logger">日志记录器,用于记录中间件的操作日志</param>
/// <param name="serviceProvider">服务提供者,用于创建服务作用域和获取所需服务</param>
public WebSocketMiddleware( public WebSocketMiddleware(
RequestDelegate next, RequestDelegate next,
ILogger<WebSocketMiddleware> logger, ILogger<WebSocketMiddleware> logger,
@ -23,61 +45,84 @@ public class WebSocketMiddleware
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
} }
/// <summary>
/// 处理HTTP请求
/// 检查是否为WebSocket请求,如果是则处理WebSocket连接
/// </summary>
/// <param name="context">HTTP上下文,包含请求和响应信息</param>
public async Task InvokeAsync(HttpContext context) public async Task InvokeAsync(HttpContext context)
{ {
// 检查是否为WebSocket请求
if (context.WebSockets.IsWebSocketRequest) if (context.WebSockets.IsWebSocketRequest)
{ {
// 创建服务作用域,确保服务实例的生命周期正确
using var scope = _serviceProvider.CreateScope(); using var scope = _serviceProvider.CreateScope();
var webSocketService = scope.ServiceProvider.GetRequiredService<IWebSocketService>(); var webSocketService = scope.ServiceProvider.GetRequiredService<IWebSocketService>();
// 接受WebSocket连接
var webSocket = await context.WebSockets.AcceptWebSocketAsync(); var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = await webSocketService.AcceptConnectionAsync(webSocket); var connectionId = await webSocketService.AcceptConnectionAsync(webSocket);
try try
{ {
// 处理WebSocket连接
await HandleWebSocketConnection(webSocket, connectionId, webSocketService); await HandleWebSocketConnection(webSocket, connectionId, webSocketService);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error handling WebSocket connection: {ConnectionId}", connectionId); _logger.LogError(ex, "处理WebSocket连接时出错: {ConnectionId}", connectionId);
} }
finally finally
{ {
// 确保连接被正确关闭
await webSocketService.CloseConnectionAsync(connectionId); await webSocketService.CloseConnectionAsync(connectionId);
} }
} }
else else
{ {
// 如果不是WebSocket请求,继续处理管道
await _next(context); await _next(context);
} }
} }
/// <summary>
/// 处理WebSocket连接
/// 循环接收消息并处理,直到连接关闭
/// </summary>
/// <param name="webSocket">WebSocket实例,用于接收和发送消息</param>
/// <param name="connectionId">连接ID,用于标识连接</param>
/// <param name="webSocketService">WebSocket服务,用于处理消息</param>
private async Task HandleWebSocketConnection( private async Task HandleWebSocketConnection(
System.Net.WebSockets.WebSocket webSocket, System.Net.WebSockets.WebSocket webSocket,
string connectionId, string connectionId,
IWebSocketService webSocketService) IWebSocketService webSocketService)
{ {
// 创建接收缓冲区,大小为4KB
var buffer = new byte[1024 * 4]; var buffer = new byte[1024 * 4];
var receiveResult = await webSocket.ReceiveAsync( var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None); new ArraySegment<byte>(buffer), CancellationToken.None);
// 循环接收消息,直到收到关闭消息
while (!receiveResult.CloseStatus.HasValue) while (!receiveResult.CloseStatus.HasValue)
{ {
try try
{ {
// 处理接收到的消息
var message = buffer.Take(receiveResult.Count).ToArray(); var message = buffer.Take(receiveResult.Count).ToArray();
await webSocketService.SendMessageAsync(connectionId, message); await webSocketService.SendMessageAsync(connectionId, message);
// 继续接收下一条消息
receiveResult = await webSocket.ReceiveAsync( receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None); new ArraySegment<byte>(buffer), CancellationToken.None);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error processing WebSocket message for connection: {ConnectionId}", connectionId); _logger.LogError(ex, "处理WebSocket消息时出错: {ConnectionId}", connectionId);
break; break;
} }
} }
// 如果收到关闭消息,则关闭连接
if (receiveResult.CloseStatus.HasValue) if (receiveResult.CloseStatus.HasValue)
{ {
await webSocket.CloseAsync( await webSocket.CloseAsync(

151
src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs

@ -11,19 +11,71 @@ using Microsoft.Extensions.Caching.Memory;
namespace CellularManagement.Infrastructure.WebSocket; namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket服务实现类
/// 负责管理WebSocket连接的生命周期、消息发送和接收
/// 使用分布式缓存存储连接信息,本地缓存存储WebSocket实例
/// </summary>
public class WebSocketService : IWebSocketService public class WebSocketService : IWebSocketService
{ {
/// <summary>
/// 日志记录器,用于记录WebSocket服务的操作日志
/// </summary>
private readonly ILogger<WebSocketService> _logger; private readonly ILogger<WebSocketService> _logger;
/// <summary>
/// WebSocket指标监控,用于记录连接、消息等指标
/// </summary>
private readonly WebSocketMetrics _metrics; private readonly WebSocketMetrics _metrics;
/// <summary>
/// WebSocket消息池,用于复用消息上下文对象
/// </summary>
private readonly WebSocketMessagePool _messagePool; private readonly WebSocketMessagePool _messagePool;
/// <summary>
/// 分布式缓存,用于存储连接信息
/// </summary>
private readonly IDistributedCache _distributedCache; private readonly IDistributedCache _distributedCache;
/// <summary>
/// 本地缓存服务,用于存储WebSocket实例
/// </summary>
private readonly ICacheService _cacheService; private readonly ICacheService _cacheService;
/// <summary>
/// 当前节点ID,用于标识服务实例
/// </summary>
private readonly string _nodeId; private readonly string _nodeId;
/// <summary>
/// WebSocket连接信息在缓存中的键前缀
/// </summary>
private const string CONNECTION_PREFIX = "ws_connection_"; private const string CONNECTION_PREFIX = "ws_connection_";
/// <summary>
/// WebSocket实例在缓存中的键前缀
/// </summary>
private const string WEBSOCKET_PREFIX = "ws_socket_"; private const string WEBSOCKET_PREFIX = "ws_socket_";
/// <summary>
/// 用户关联信息在缓存中的键前缀
/// </summary>
private const string USER_PREFIX = "ws_user_"; private const string USER_PREFIX = "ws_user_";
/// <summary>
/// 节点信息在缓存中的键前缀
/// </summary>
private const string NODE_PREFIX = "ws_node_"; private const string NODE_PREFIX = "ws_node_";
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器,用于记录服务操作日志</param>
/// <param name="metrics">WebSocket指标监控,用于记录连接和消息指标</param>
/// <param name="messagePool">消息池,用于复用消息上下文对象</param>
/// <param name="distributedCache">分布式缓存,用于存储连接信息</param>
/// <param name="cacheService">本地缓存服务,用于存储WebSocket实例</param>
public WebSocketService( public WebSocketService(
ILogger<WebSocketService> logger, ILogger<WebSocketService> logger,
WebSocketMetrics metrics, WebSocketMetrics metrics,
@ -36,45 +88,62 @@ public class WebSocketService : IWebSocketService
_messagePool = messagePool; _messagePool = messagePool;
_distributedCache = distributedCache; _distributedCache = distributedCache;
_cacheService = cacheService; _cacheService = cacheService;
_nodeId = Guid.NewGuid().ToString(); _nodeId = Guid.NewGuid().ToString(); // 生成唯一的节点ID
} }
/// <summary>
/// 接受新的WebSocket连接
/// 创建连接信息并存储到缓存中
/// </summary>
/// <param name="webSocket">WebSocket实例</param>
/// <returns>新创建的连接ID</returns>
public async Task<string> AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket) public async Task<string> AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket)
{ {
var connectionId = Guid.NewGuid().ToString(); var connectionId = Guid.NewGuid().ToString();
var connection = WebSocketConnection.Create(connectionId); var connection = WebSocketConnection.Create(connectionId);
// 生成缓存键
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}";
var nodeKey = $"{NODE_PREFIX}{_nodeId}"; var nodeKey = $"{NODE_PREFIX}{_nodeId}";
// 将连接信息存储到分布式缓存
await _distributedCache.SetStringAsync( await _distributedCache.SetStringAsync(
connectionKey, connectionKey,
JsonSerializer.Serialize(connection), JsonSerializer.Serialize(connection),
new DistributedCacheEntryOptions new DistributedCacheEntryOptions
{ {
SlidingExpiration = TimeSpan.FromMinutes(30) SlidingExpiration = TimeSpan.FromMinutes(30) // 设置30分钟滑动过期
}); });
// 将WebSocket实例存储到本地缓存
_cacheService.Set(webSocketKey, webSocket, new MemoryCacheEntryOptions _cacheService.Set(webSocketKey, webSocket, new MemoryCacheEntryOptions
{ {
SlidingExpiration = TimeSpan.FromMinutes(30) SlidingExpiration = TimeSpan.FromMinutes(30)
}); });
// 将连接添加到当前节点
await AddConnectionToNodeAsync(connectionId); await AddConnectionToNodeAsync(connectionId);
_metrics.ConnectionEstablished(); _metrics.ConnectionEstablished(); // 记录连接建立指标
_logger.LogInformation("WebSocket connection accepted: {ConnectionId} on node {NodeId}", _logger.LogInformation("WebSocket连接已建立: {ConnectionId} 在节点 {NodeId}",
connectionId, _nodeId); connectionId, _nodeId);
return connectionId; return connectionId;
} }
/// <summary>
/// 关闭WebSocket连接
/// 清理连接相关的所有缓存信息
/// </summary>
/// <param name="connectionId">要关闭的连接ID</param>
/// <returns>是否成功关闭连接</returns>
public async Task<bool> CloseConnectionAsync(string connectionId) public async Task<bool> CloseConnectionAsync(string connectionId)
{ {
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}";
var userKey = $"{USER_PREFIX}{connectionId}"; var userKey = $"{USER_PREFIX}{connectionId}";
// 获取连接信息
var connectionJson = await _distributedCache.GetStringAsync(connectionKey); var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket)) if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket))
{ {
@ -87,17 +156,18 @@ public class WebSocketService : IWebSocketService
{ {
try try
{ {
// 如果WebSocket处于打开状态,则正常关闭
if (webSocket.State == WebSocketState.Open || webSocket.State == WebSocketState.CloseReceived) if (webSocket.State == WebSocketState.Open || webSocket.State == WebSocketState.CloseReceived)
{ {
await webSocket.CloseAsync( await webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure, WebSocketCloseStatus.NormalClosure,
"Connection closed by server", "服务器关闭连接",
CancellationToken.None); CancellationToken.None);
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error closing WebSocket connection: {ConnectionId}", connectionId); _logger.LogError(ex, "关闭WebSocket连接时出错: {ConnectionId}", connectionId);
} }
connection.Close(); connection.Close();
@ -107,20 +177,27 @@ public class WebSocketService : IWebSocketService
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogWarning(ex, "Failed to deserialize WebSocketConnection for connection: {ConnectionId}, continuing with cleanup", connectionId); _logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 继续清理", connectionId);
} }
} }
// 清理连接相关的所有缓存
await RemoveConnectionFromNodeAsync(connectionId); await RemoveConnectionFromNodeAsync(connectionId);
await _distributedCache.RemoveAsync(connectionKey); await _distributedCache.RemoveAsync(connectionKey);
_cacheService.Remove(webSocketKey); _cacheService.Remove(webSocketKey);
await _distributedCache.RemoveAsync(userKey); await _distributedCache.RemoveAsync(userKey);
_metrics.ConnectionClosed(); _metrics.ConnectionClosed(); // 记录连接关闭指标
_logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId); _logger.LogInformation("WebSocket连接已关闭: {ConnectionId}", connectionId);
return true; return true;
} }
/// <summary>
/// 向指定连接发送消息
/// </summary>
/// <param name="connectionId">目标连接ID</param>
/// <param name="message">要发送的消息内容</param>
/// <returns>是否成功发送消息</returns>
public async Task<bool> SendMessageAsync(string connectionId, byte[] message) public async Task<bool> SendMessageAsync(string connectionId, byte[] message)
{ {
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
@ -144,30 +221,37 @@ public class WebSocketService : IWebSocketService
true, true,
CancellationToken.None); CancellationToken.None);
_metrics.MessageProcessed(TimeSpan.Zero); _metrics.MessageProcessed(TimeSpan.Zero); // 记录消息处理指标
return true; return true;
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error sending message to connection: {ConnectionId}", connectionId); _logger.LogError(ex, "发送消息到连接时出错: {ConnectionId}", connectionId);
_metrics.ErrorOccurred("SendMessage"); _metrics.ErrorOccurred("SendMessage"); // 记录错误指标
} }
} }
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogWarning(ex, "Failed to deserialize WebSocketConnection for connection: {ConnectionId}, skipping message send", connectionId); _logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 跳过消息发送", connectionId);
} }
} }
return false; return false;
} }
/// <summary>
/// 广播消息到所有连接
/// 遍历所有节点上的所有连接并发送消息
/// </summary>
/// <param name="message">要广播的消息内容</param>
/// <returns>是否所有消息都发送成功</returns>
public async Task<bool> BroadcastMessageAsync(byte[] message) public async Task<bool> BroadcastMessageAsync(byte[] message)
{ {
var nodes = await GetAllNodesAsync(); var nodes = await GetAllNodesAsync();
var success = true; var success = true;
// 遍历所有节点上的所有连接
foreach (var node in nodes) foreach (var node in nodes)
{ {
var nodeKey = $"{NODE_PREFIX}{node}"; var nodeKey = $"{NODE_PREFIX}{node}";
@ -188,11 +272,18 @@ public class WebSocketService : IWebSocketService
return success; return success;
} }
/// <summary>
/// 向指定用户的所有连接发送消息
/// </summary>
/// <param name="userId">目标用户ID</param>
/// <param name="message">要发送的消息内容</param>
/// <returns>是否所有消息都发送成功</returns>
public async Task<bool> SendMessageToUserAsync(string userId, byte[] message) public async Task<bool> SendMessageToUserAsync(string userId, byte[] message)
{ {
var userConnections = await GetUserConnectionsAsync(userId); var userConnections = await GetUserConnectionsAsync(userId);
var success = true; var success = true;
// 向用户的所有连接发送消息
foreach (var connection in userConnections) foreach (var connection in userConnections)
{ {
if (!await SendMessageAsync(connection.ConnectionId, message)) if (!await SendMessageAsync(connection.ConnectionId, message))
@ -204,6 +295,12 @@ public class WebSocketService : IWebSocketService
return success; return success;
} }
/// <summary>
/// 将连接与用户关联
/// 更新连接信息和用户关联缓存
/// </summary>
/// <param name="connectionId">连接ID</param>
/// <param name="userId">用户ID</param>
public async Task AssociateUserAsync(string connectionId, string userId) public async Task AssociateUserAsync(string connectionId, string userId)
{ {
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
@ -222,6 +319,11 @@ public class WebSocketService : IWebSocketService
} }
} }
/// <summary>
/// 获取指定连接的信息
/// </summary>
/// <param name="connectionId">连接ID</param>
/// <returns>连接信息,如果不存在则返回null</returns>
public async Task<WebSocketConnection?> GetConnectionAsync(string connectionId) public async Task<WebSocketConnection?> GetConnectionAsync(string connectionId)
{ {
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
@ -231,11 +333,17 @@ public class WebSocketService : IWebSocketService
: null; : null;
} }
/// <summary>
/// 获取指定用户的所有连接
/// </summary>
/// <param name="userId">用户ID</param>
/// <returns>用户的所有连接信息</returns>
public async Task<IEnumerable<WebSocketConnection>> GetUserConnectionsAsync(string userId) public async Task<IEnumerable<WebSocketConnection>> GetUserConnectionsAsync(string userId)
{ {
var connections = new List<WebSocketConnection>(); var connections = new List<WebSocketConnection>();
var nodes = await GetAllNodesAsync(); var nodes = await GetAllNodesAsync();
// 遍历所有节点,查找用户的连接
foreach (var node in nodes) foreach (var node in nodes)
{ {
var nodeKey = $"{NODE_PREFIX}{node}"; var nodeKey = $"{NODE_PREFIX}{node}";
@ -257,6 +365,10 @@ public class WebSocketService : IWebSocketService
return connections; return connections;
} }
/// <summary>
/// 将连接添加到当前节点
/// </summary>
/// <param name="connectionId">要添加的连接ID</param>
private async Task AddConnectionToNodeAsync(string connectionId) private async Task AddConnectionToNodeAsync(string connectionId)
{ {
var nodeKey = $"{NODE_PREFIX}{_nodeId}"; var nodeKey = $"{NODE_PREFIX}{_nodeId}";
@ -267,6 +379,10 @@ public class WebSocketService : IWebSocketService
JsonSerializer.Serialize(connections)); JsonSerializer.Serialize(connections));
} }
/// <summary>
/// 从当前节点移除连接
/// </summary>
/// <param name="connectionId">要移除的连接ID</param>
private async Task RemoveConnectionFromNodeAsync(string connectionId) private async Task RemoveConnectionFromNodeAsync(string connectionId)
{ {
var nodeKey = $"{NODE_PREFIX}{_nodeId}"; var nodeKey = $"{NODE_PREFIX}{_nodeId}";
@ -277,6 +393,10 @@ public class WebSocketService : IWebSocketService
JsonSerializer.Serialize(connections)); JsonSerializer.Serialize(connections));
} }
/// <summary>
/// 获取当前节点的所有连接
/// </summary>
/// <returns>当前节点的所有连接ID列表</returns>
private async Task<List<string>> GetNodeConnectionsAsync() private async Task<List<string>> GetNodeConnectionsAsync()
{ {
var nodeKey = $"{NODE_PREFIX}{_nodeId}"; var nodeKey = $"{NODE_PREFIX}{_nodeId}";
@ -286,6 +406,11 @@ public class WebSocketService : IWebSocketService
: new List<string>(); : new List<string>();
} }
/// <summary>
/// 获取所有节点ID
/// 目前仅返回当前节点ID,后续需要实现服务发现机制
/// </summary>
/// <returns>所有节点ID列表</returns>
private async Task<List<string>> GetAllNodesAsync() private async Task<List<string>> GetAllNodesAsync()
{ {
// 这里需要实现服务发现机制 // 这里需要实现服务发现机制

Loading…
Cancel
Save