diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs
index 8b40e5f..ccd129c 100644
--- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs
+++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs
@@ -5,55 +5,117 @@ using CellularManagement.Application.Services;
namespace CellularManagement.Infrastructure.WebSocket;
+///
+/// WebSocket连接管理器
+/// 负责管理WebSocket连接的创建、删除和查询
+/// 使用线程安全的并发集合存储连接信息
+///
public class WebSocketConnectionManager
{
+ ///
+ /// 日志记录器,用于记录连接管理器的操作日志
+ ///
private readonly ILogger _logger;
+
+ ///
+ /// WebSocket连接字典
+ /// 键为连接ID,值为WebSocket实例
+ /// 使用线程安全的ConcurrentDictionary
+ ///
private readonly ConcurrentDictionary _sockets = new();
+
+ ///
+ /// 用户连接关联字典
+ /// 键为连接ID,值为用户ID
+ /// 使用线程安全的ConcurrentDictionary
+ ///
private readonly ConcurrentDictionary _userConnections = new();
+ ///
+ /// 构造函数
+ ///
+ /// 日志记录器,用于记录连接管理器的操作日志
public WebSocketConnectionManager(ILogger logger)
{
_logger = logger;
}
+ ///
+ /// 添加新的WebSocket连接
+ /// 生成唯一的连接ID并存储连接信息
+ ///
+ /// 要添加的WebSocket实例
+ /// 新创建的连接ID
public string AddConnection(System.Net.WebSockets.WebSocket socket)
{
var connectionId = Guid.NewGuid().ToString();
_sockets.TryAdd(connectionId, socket);
- _logger.LogInformation("WebSocket connection added: {ConnectionId}", connectionId);
+ _logger.LogInformation("WebSocket连接已添加: {ConnectionId}", connectionId);
return connectionId;
}
+ ///
+ /// 移除WebSocket连接
+ /// 清理连接相关的所有信息
+ ///
+ /// 要移除的连接ID
+ /// 是否成功移除连接
public bool RemoveConnection(string connectionId)
{
_sockets.TryRemove(connectionId, out _);
_userConnections.TryRemove(connectionId, out _);
- _logger.LogInformation("WebSocket connection removed: {ConnectionId}", connectionId);
+ _logger.LogInformation("WebSocket连接已移除: {ConnectionId}", connectionId);
return true;
}
+ ///
+ /// 获取指定ID的WebSocket连接
+ ///
+ /// 要查询的连接ID
+ /// WebSocket实例,如果不存在则返回null
public System.Net.WebSockets.WebSocket? GetConnection(string connectionId)
{
_sockets.TryGetValue(connectionId, out var socket);
return socket;
}
+ ///
+ /// 获取所有WebSocket连接
+ ///
+ /// 所有WebSocket实例的集合
public IEnumerable GetAllConnections()
{
return _sockets.Values;
}
+ ///
+ /// 将连接与用户关联
+ /// 建立连接ID和用户ID的映射关系
+ ///
+ /// 要关联的连接ID
+ /// 要关联的用户ID
public void AssociateUser(string connectionId, string userId)
{
_userConnections.TryAdd(connectionId, userId);
}
+ ///
+ /// 获取连接关联的用户ID
+ ///
+ /// 要查询的连接ID
+ /// 用户ID,如果未关联则返回null
public string? GetUserId(string connectionId)
{
_userConnections.TryGetValue(connectionId, out var userId);
return userId;
}
+ ///
+ /// 获取用户的所有WebSocket连接
+ /// 根据用户ID查找所有关联的连接
+ ///
+ /// 要查询的用户ID
+ /// 用户的所有WebSocket实例的集合
public IEnumerable GetUserConnections(string userId)
{
var connections = _userConnections
diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs
index 79c4716..0d5b8ee 100644
--- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs
+++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs
@@ -4,14 +4,45 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.Infrastructure.WebSocket;
+///
+/// WebSocket消息处理管道
+/// 负责批量处理和分发WebSocket消息
+/// 使用Channel实现消息队列,支持批量处理提高性能
+///
public class WebSocketMessagePipeline
{
+ ///
+ /// 消息通道,用于存储待处理的消息
+ /// 使用有界通道限制内存使用
+ ///
private readonly Channel _messageChannel;
+
+ ///
+ /// 日志记录器,用于记录管道的操作日志
+ ///
private readonly ILogger _logger;
+
+ ///
+ /// 内存池,用于复用内存缓冲区
+ ///
private readonly MemoryPool _memoryPool;
+
+ ///
+ /// 批处理大小,每次处理的消息数量
+ ///
private readonly int _batchSize;
+
+ ///
+ /// 最大队列大小,限制通道中可存储的消息数量
+ ///
private readonly int _maxQueueSize;
+ ///
+ /// 构造函数
+ ///
+ /// 日志记录器,用于记录管道的操作日志
+ /// 批处理大小,每次处理的消息数量,默认100
+ /// 最大队列大小,限制通道中可存储的消息数量,默认10000
public WebSocketMessagePipeline(
ILogger logger,
int batchSize = 100,
@@ -22,16 +53,23 @@ public class WebSocketMessagePipeline
_maxQueueSize = maxQueueSize;
_memoryPool = MemoryPool.Shared;
+ // 创建有界通道
var options = new BoundedChannelOptions(maxQueueSize)
{
- FullMode = BoundedChannelFullMode.Wait,
- SingleWriter = false,
- SingleReader = false
+ FullMode = BoundedChannelFullMode.Wait, // 队列满时等待
+ SingleWriter = false, // 允许多个写入者
+ SingleReader = false // 允许多个读取者
};
_messageChannel = Channel.CreateBounded(options);
}
+ ///
+ /// 写入消息到管道
+ /// 将消息添加到通道中等待处理
+ ///
+ /// 要处理的WebSocket消息
+ /// 是否成功写入消息
public async ValueTask WriteMessageAsync(WebSocketMessage message)
{
try
@@ -41,11 +79,16 @@ public class WebSocketMessagePipeline
}
catch (Exception ex)
{
- _logger.LogError(ex, "Failed to write message to pipeline");
+ _logger.LogError(ex, "写入消息到管道失败");
return false;
}
}
+ ///
+ /// 处理消息
+ /// 循环从通道中读取消息并批量处理
+ ///
+ /// 取消令牌,用于停止处理
public async Task ProcessMessagesAsync(CancellationToken cancellationToken)
{
var batch = new List(_batchSize);
@@ -74,14 +117,20 @@ public class WebSocketMessagePipeline
}
catch (OperationCanceledException)
{
- _logger.LogInformation("Message processing pipeline stopped");
+ _logger.LogInformation("消息处理管道已停止");
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error in message processing pipeline");
+ _logger.LogError(ex, "消息处理管道出错");
}
}
+ ///
+ /// 批量处理消息
+ /// 处理一批消息,逐个处理并记录错误
+ ///
+ /// 要处理的消息批次
+ /// 取消令牌,用于停止处理
private async Task ProcessBatchAsync(List batch, CancellationToken cancellationToken)
{
// 这里实现具体的消息处理逻辑
@@ -94,11 +143,17 @@ public class WebSocketMessagePipeline
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error processing message: {MessageId}", message.MessageId);
+ _logger.LogError(ex, "处理消息时出错: {MessageId}", message.MessageId);
}
}
}
+ ///
+ /// 处理单个消息
+ /// 实现具体的消息处理逻辑
+ ///
+ /// 要处理的WebSocket消息
+ /// 取消令牌,用于停止处理
private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
// 实现具体的消息处理逻辑
@@ -106,11 +161,39 @@ public class WebSocketMessagePipeline
}
}
+///
+/// WebSocket消息实体
+/// 表示一个WebSocket消息的基本信息
+///
public class WebSocketMessage
{
+ ///
+ /// 消息ID
+ /// 唯一标识一条消息
+ ///
public string MessageId { get; set; } = Guid.NewGuid().ToString();
+
+ ///
+ /// 连接ID
+ /// 标识消息来自哪个WebSocket连接
+ ///
public string ConnectionId { get; set; } = string.Empty;
+
+ ///
+ /// 消息类型
+ /// 用于标识消息的类型,可用于路由和处理
+ ///
public string MessageType { get; set; } = string.Empty;
+
+ ///
+ /// 消息内容
+ /// 消息的实际数据
+ ///
public byte[] Payload { get; set; } = Array.Empty();
+
+ ///
+ /// 时间戳
+ /// 消息创建的时间
+ ///
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
\ No newline at end of file
diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs
index 16e8c6e..14ef73f 100644
--- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs
+++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs
@@ -3,28 +3,59 @@ using CellularManagement.Application.Pipeline;
namespace CellularManagement.Infrastructure.WebSocket;
+///
+/// WebSocket消息池
+/// 负责管理WebSocket消息上下文的复用,减少内存分配
+/// 使用对象池模式提高性能
+///
public class WebSocketMessagePool
{
+ ///
+ /// 消息上下文池
+ /// 使用线程安全的ConcurrentBag存储可复用的消息上下文
+ ///
private readonly ConcurrentBag _pool = new();
+
+ ///
+ /// 最大池大小
+ /// 限制池中可存储的消息上下文数量
+ ///
private readonly int _maxPoolSize;
+ ///
+ /// 构造函数
+ ///
+ /// 最大池大小,限制池中可存储的消息上下文数量,默认1000
public WebSocketMessagePool(int maxPoolSize = 1000)
{
_maxPoolSize = maxPoolSize;
}
+ ///
+ /// 从池中获取消息上下文
+ /// 如果池中有可用的上下文则复用,否则创建新的
+ ///
+ /// 可用的消息上下文
public WebSocketMessageContext Rent()
{
+ // 尝试从池中获取可用的上下文
if (_pool.TryTake(out var context))
{
return context;
}
+ // 如果池中没有可用的上下文,创建新的
return new WebSocketMessageContext();
}
+ ///
+ /// 将消息上下文返回到池中
+ /// 重置上下文状态并放回池中,如果池已满则丢弃
+ ///
+ /// 要返回的消息上下文
public void Return(WebSocketMessageContext context)
{
+ // 如果池未满,重置上下文并放回池中
if (_pool.Count < _maxPoolSize)
{
// 重置上下文状态
diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs
index 6b24701..98f5391 100644
--- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs
+++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs
@@ -7,12 +7,34 @@ using CellularManagement.Application.Services;
namespace CellularManagement.Infrastructure.WebSocket;
+///
+/// WebSocket中间件
+/// 负责处理WebSocket请求的生命周期和消息处理
+/// 作为ASP.NET Core中间件,拦截WebSocket请求并管理连接
+///
public class WebSocketMiddleware
{
+ ///
+ /// 请求委托,用于继续处理管道中的下一个中间件
+ ///
private readonly RequestDelegate _next;
+
+ ///
+ /// 日志记录器,用于记录中间件的操作日志
+ ///
private readonly ILogger _logger;
+
+ ///
+ /// 服务提供者,用于创建服务作用域和获取所需服务
+ ///
private readonly IServiceProvider _serviceProvider;
+ ///
+ /// 构造函数
+ ///
+ /// 请求委托,用于继续处理管道中的下一个中间件
+ /// 日志记录器,用于记录中间件的操作日志
+ /// 服务提供者,用于创建服务作用域和获取所需服务
public WebSocketMiddleware(
RequestDelegate next,
ILogger logger,
@@ -23,61 +45,84 @@ public class WebSocketMiddleware
_serviceProvider = serviceProvider;
}
+ ///
+ /// 处理HTTP请求
+ /// 检查是否为WebSocket请求,如果是则处理WebSocket连接
+ ///
+ /// HTTP上下文,包含请求和响应信息
public async Task InvokeAsync(HttpContext context)
{
+ // 检查是否为WebSocket请求
if (context.WebSockets.IsWebSocketRequest)
{
+ // 创建服务作用域,确保服务实例的生命周期正确
using var scope = _serviceProvider.CreateScope();
var webSocketService = scope.ServiceProvider.GetRequiredService();
+ // 接受WebSocket连接
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = await webSocketService.AcceptConnectionAsync(webSocket);
try
{
+ // 处理WebSocket连接
await HandleWebSocketConnection(webSocket, connectionId, webSocketService);
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error handling WebSocket connection: {ConnectionId}", connectionId);
+ _logger.LogError(ex, "处理WebSocket连接时出错: {ConnectionId}", connectionId);
}
finally
{
+ // 确保连接被正确关闭
await webSocketService.CloseConnectionAsync(connectionId);
}
}
else
{
+ // 如果不是WebSocket请求,继续处理管道
await _next(context);
}
}
+ ///
+ /// 处理WebSocket连接
+ /// 循环接收消息并处理,直到连接关闭
+ ///
+ /// WebSocket实例,用于接收和发送消息
+ /// 连接ID,用于标识连接
+ /// WebSocket服务,用于处理消息
private async Task HandleWebSocketConnection(
System.Net.WebSockets.WebSocket webSocket,
string connectionId,
IWebSocketService webSocketService)
{
+ // 创建接收缓冲区,大小为4KB
var buffer = new byte[1024 * 4];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment(buffer), CancellationToken.None);
+ // 循环接收消息,直到收到关闭消息
while (!receiveResult.CloseStatus.HasValue)
{
try
{
+ // 处理接收到的消息
var message = buffer.Take(receiveResult.Count).ToArray();
await webSocketService.SendMessageAsync(connectionId, message);
+ // 继续接收下一条消息
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment(buffer), CancellationToken.None);
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error processing WebSocket message for connection: {ConnectionId}", connectionId);
+ _logger.LogError(ex, "处理WebSocket消息时出错: {ConnectionId}", connectionId);
break;
}
}
+ // 如果收到关闭消息,则关闭连接
if (receiveResult.CloseStatus.HasValue)
{
await webSocket.CloseAsync(
diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs
index 447dd92..a39d909 100644
--- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs
+++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs
@@ -11,19 +11,71 @@ using Microsoft.Extensions.Caching.Memory;
namespace CellularManagement.Infrastructure.WebSocket;
+///
+/// WebSocket服务实现类
+/// 负责管理WebSocket连接的生命周期、消息发送和接收
+/// 使用分布式缓存存储连接信息,本地缓存存储WebSocket实例
+///
public class WebSocketService : IWebSocketService
{
+ ///
+ /// 日志记录器,用于记录WebSocket服务的操作日志
+ ///
private readonly ILogger _logger;
+
+ ///
+ /// WebSocket指标监控,用于记录连接、消息等指标
+ ///
private readonly WebSocketMetrics _metrics;
+
+ ///
+ /// WebSocket消息池,用于复用消息上下文对象
+ ///
private readonly WebSocketMessagePool _messagePool;
+
+ ///
+ /// 分布式缓存,用于存储连接信息
+ ///
private readonly IDistributedCache _distributedCache;
+
+ ///
+ /// 本地缓存服务,用于存储WebSocket实例
+ ///
private readonly ICacheService _cacheService;
+
+ ///
+ /// 当前节点ID,用于标识服务实例
+ ///
private readonly string _nodeId;
+
+ ///
+ /// WebSocket连接信息在缓存中的键前缀
+ ///
private const string CONNECTION_PREFIX = "ws_connection_";
+
+ ///
+ /// WebSocket实例在缓存中的键前缀
+ ///
private const string WEBSOCKET_PREFIX = "ws_socket_";
+
+ ///
+ /// 用户关联信息在缓存中的键前缀
+ ///
private const string USER_PREFIX = "ws_user_";
+
+ ///
+ /// 节点信息在缓存中的键前缀
+ ///
private const string NODE_PREFIX = "ws_node_";
+ ///
+ /// 构造函数
+ ///
+ /// 日志记录器,用于记录服务操作日志
+ /// WebSocket指标监控,用于记录连接和消息指标
+ /// 消息池,用于复用消息上下文对象
+ /// 分布式缓存,用于存储连接信息
+ /// 本地缓存服务,用于存储WebSocket实例
public WebSocketService(
ILogger logger,
WebSocketMetrics metrics,
@@ -36,45 +88,62 @@ public class WebSocketService : IWebSocketService
_messagePool = messagePool;
_distributedCache = distributedCache;
_cacheService = cacheService;
- _nodeId = Guid.NewGuid().ToString();
+ _nodeId = Guid.NewGuid().ToString(); // 生成唯一的节点ID
}
+ ///
+ /// 接受新的WebSocket连接
+ /// 创建连接信息并存储到缓存中
+ ///
+ /// WebSocket实例
+ /// 新创建的连接ID
public async Task AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket)
{
var connectionId = Guid.NewGuid().ToString();
var connection = WebSocketConnection.Create(connectionId);
+ // 生成缓存键
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}";
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
+ // 将连接信息存储到分布式缓存
await _distributedCache.SetStringAsync(
connectionKey,
JsonSerializer.Serialize(connection),
new DistributedCacheEntryOptions
{
- SlidingExpiration = TimeSpan.FromMinutes(30)
+ SlidingExpiration = TimeSpan.FromMinutes(30) // 设置30分钟滑动过期
});
+ // 将WebSocket实例存储到本地缓存
_cacheService.Set(webSocketKey, webSocket, new MemoryCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromMinutes(30)
});
+ // 将连接添加到当前节点
await AddConnectionToNodeAsync(connectionId);
- _metrics.ConnectionEstablished();
- _logger.LogInformation("WebSocket connection accepted: {ConnectionId} on node {NodeId}",
+ _metrics.ConnectionEstablished(); // 记录连接建立指标
+ _logger.LogInformation("WebSocket连接已建立: {ConnectionId} 在节点 {NodeId}",
connectionId, _nodeId);
return connectionId;
}
+ ///
+ /// 关闭WebSocket连接
+ /// 清理连接相关的所有缓存信息
+ ///
+ /// 要关闭的连接ID
+ /// 是否成功关闭连接
public async Task CloseConnectionAsync(string connectionId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}";
var userKey = $"{USER_PREFIX}{connectionId}";
+ // 获取连接信息
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket))
{
@@ -87,17 +156,18 @@ public class WebSocketService : IWebSocketService
{
try
{
+ // 如果WebSocket处于打开状态,则正常关闭
if (webSocket.State == WebSocketState.Open || webSocket.State == WebSocketState.CloseReceived)
{
await webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
- "Connection closed by server",
+ "服务器关闭连接",
CancellationToken.None);
}
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error closing WebSocket connection: {ConnectionId}", connectionId);
+ _logger.LogError(ex, "关闭WebSocket连接时出错: {ConnectionId}", connectionId);
}
connection.Close();
@@ -107,20 +177,27 @@ public class WebSocketService : IWebSocketService
}
catch (Exception ex)
{
- _logger.LogWarning(ex, "Failed to deserialize WebSocketConnection for connection: {ConnectionId}, continuing with cleanup", connectionId);
+ _logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 继续清理", connectionId);
}
}
+ // 清理连接相关的所有缓存
await RemoveConnectionFromNodeAsync(connectionId);
await _distributedCache.RemoveAsync(connectionKey);
_cacheService.Remove(webSocketKey);
await _distributedCache.RemoveAsync(userKey);
- _metrics.ConnectionClosed();
- _logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId);
+ _metrics.ConnectionClosed(); // 记录连接关闭指标
+ _logger.LogInformation("WebSocket连接已关闭: {ConnectionId}", connectionId);
return true;
}
+ ///
+ /// 向指定连接发送消息
+ ///
+ /// 目标连接ID
+ /// 要发送的消息内容
+ /// 是否成功发送消息
public async Task SendMessageAsync(string connectionId, byte[] message)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
@@ -144,30 +221,37 @@ public class WebSocketService : IWebSocketService
true,
CancellationToken.None);
- _metrics.MessageProcessed(TimeSpan.Zero);
+ _metrics.MessageProcessed(TimeSpan.Zero); // 记录消息处理指标
return true;
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error sending message to connection: {ConnectionId}", connectionId);
- _metrics.ErrorOccurred("SendMessage");
+ _logger.LogError(ex, "发送消息到连接时出错: {ConnectionId}", connectionId);
+ _metrics.ErrorOccurred("SendMessage"); // 记录错误指标
}
}
}
}
catch (Exception ex)
{
- _logger.LogWarning(ex, "Failed to deserialize WebSocketConnection for connection: {ConnectionId}, skipping message send", connectionId);
+ _logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 跳过消息发送", connectionId);
}
}
return false;
}
+ ///
+ /// 广播消息到所有连接
+ /// 遍历所有节点上的所有连接并发送消息
+ ///
+ /// 要广播的消息内容
+ /// 是否所有消息都发送成功
public async Task BroadcastMessageAsync(byte[] message)
{
var nodes = await GetAllNodesAsync();
var success = true;
+ // 遍历所有节点上的所有连接
foreach (var node in nodes)
{
var nodeKey = $"{NODE_PREFIX}{node}";
@@ -188,11 +272,18 @@ public class WebSocketService : IWebSocketService
return success;
}
+ ///
+ /// 向指定用户的所有连接发送消息
+ ///
+ /// 目标用户ID
+ /// 要发送的消息内容
+ /// 是否所有消息都发送成功
public async Task SendMessageToUserAsync(string userId, byte[] message)
{
var userConnections = await GetUserConnectionsAsync(userId);
var success = true;
+ // 向用户的所有连接发送消息
foreach (var connection in userConnections)
{
if (!await SendMessageAsync(connection.ConnectionId, message))
@@ -204,6 +295,12 @@ public class WebSocketService : IWebSocketService
return success;
}
+ ///
+ /// 将连接与用户关联
+ /// 更新连接信息和用户关联缓存
+ ///
+ /// 连接ID
+ /// 用户ID
public async Task AssociateUserAsync(string connectionId, string userId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
@@ -222,6 +319,11 @@ public class WebSocketService : IWebSocketService
}
}
+ ///
+ /// 获取指定连接的信息
+ ///
+ /// 连接ID
+ /// 连接信息,如果不存在则返回null
public async Task GetConnectionAsync(string connectionId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
@@ -231,11 +333,17 @@ public class WebSocketService : IWebSocketService
: null;
}
+ ///
+ /// 获取指定用户的所有连接
+ ///
+ /// 用户ID
+ /// 用户的所有连接信息
public async Task> GetUserConnectionsAsync(string userId)
{
var connections = new List();
var nodes = await GetAllNodesAsync();
+ // 遍历所有节点,查找用户的连接
foreach (var node in nodes)
{
var nodeKey = $"{NODE_PREFIX}{node}";
@@ -257,6 +365,10 @@ public class WebSocketService : IWebSocketService
return connections;
}
+ ///
+ /// 将连接添加到当前节点
+ ///
+ /// 要添加的连接ID
private async Task AddConnectionToNodeAsync(string connectionId)
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
@@ -267,6 +379,10 @@ public class WebSocketService : IWebSocketService
JsonSerializer.Serialize(connections));
}
+ ///
+ /// 从当前节点移除连接
+ ///
+ /// 要移除的连接ID
private async Task RemoveConnectionFromNodeAsync(string connectionId)
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
@@ -277,6 +393,10 @@ public class WebSocketService : IWebSocketService
JsonSerializer.Serialize(connections));
}
+ ///
+ /// 获取当前节点的所有连接
+ ///
+ /// 当前节点的所有连接ID列表
private async Task> GetNodeConnectionsAsync()
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
@@ -286,6 +406,11 @@ public class WebSocketService : IWebSocketService
: new List();
}
+ ///
+ /// 获取所有节点ID
+ /// 目前仅返回当前节点ID,后续需要实现服务发现机制
+ ///
+ /// 所有节点ID列表
private async Task> GetAllNodesAsync()
{
// 这里需要实现服务发现机制