diff --git a/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs b/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
index e91dc99..5999a46 100644
--- a/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
+++ b/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
@@ -5,23 +5,37 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Connection;
+///
+/// WebSocket 连接管理器
+/// 负责管理所有 WebSocket 连接,处理消息的入队和出队
+///
public class WebSocketConnectionManager : IDisposable
{
+ // 存储所有活动的 WebSocket 连接
private readonly ConcurrentDictionary _connections = new();
+ // 入站消息队列
private readonly Channel _incomingMessages;
+ // 出站消息队列
private readonly Channel _outgoingMessages;
private readonly ILogger _logger;
+ // 心跳检测定时器
private readonly Timer _heartbeatTimer;
private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(30);
+ // 消息到达事件
+ public event Func? OnMessageReceived;
+
public WebSocketConnectionManager(ILogger logger)
{
_logger = logger;
+ _logger.LogInformation("初始化 WebSocket 连接管理器");
+
+ // 创建有界通道,限制最大消息数量为 10000
_incomingMessages = Channel.CreateBounded(new BoundedChannelOptions(10000)
{
- FullMode = BoundedChannelFullMode.Wait,
- SingleReader = false,
- SingleWriter = false
+ FullMode = BoundedChannelFullMode.Wait, // 当队列满时等待
+ SingleReader = false, // 允许多个读取者
+ SingleWriter = false // 允许多个写入者
});
_outgoingMessages = Channel.CreateBounded(new BoundedChannelOptions(10000)
@@ -31,9 +45,17 @@ public class WebSocketConnectionManager : IDisposable
SingleWriter = false
});
+ _logger.LogInformation("创建消息队列完成,入站队列大小:{IncomingSize},出站队列大小:{OutgoingSize}",
+ 10000, 10000);
+
+ // 启动心跳检测定时器
_heartbeatTimer = new Timer(CheckConnections, null, _heartbeatInterval, _heartbeatInterval);
+ _logger.LogInformation("心跳检测定时器已启动,间隔:{Interval}秒", _heartbeatInterval.TotalSeconds);
}
+ ///
+ /// 添加新的 WebSocket 连接
+ ///
public string AddConnection(System.Net.WebSockets.WebSocket socket)
{
var connectionId = Guid.NewGuid().ToString();
@@ -45,91 +67,155 @@ public class WebSocketConnectionManager : IDisposable
};
_connections.TryAdd(connectionId, connection);
- _logger.LogInformation("New connection added: {ConnectionId}", connectionId);
+ _logger.LogInformation("添加新连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}",
+ connectionId, _connections.Count);
return connectionId;
}
+ ///
+ /// 移除 WebSocket 连接
+ ///
public bool RemoveConnection(string connectionId)
{
if (_connections.TryRemove(connectionId, out var connection))
{
- _logger.LogInformation("Connection removed: {ConnectionId}", connectionId);
+ _logger.LogInformation("移除连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}",
+ connectionId, _connections.Count);
return true;
}
+ _logger.LogWarning("移除连接失败,连接ID:{ConnectionId} 不存在", connectionId);
return false;
}
+ ///
+ /// 获取指定连接
+ ///
public WebSocketConnection? GetConnection(string connectionId)
{
_connections.TryGetValue(connectionId, out var connection);
+ if (connection == null)
+ {
+ _logger.LogWarning("获取连接失败,连接ID:{ConnectionId} 不存在", connectionId);
+ }
return connection;
}
+ ///
+ /// 更新连接活动时间
+ ///
public void UpdateConnectionActivity(string connectionId)
{
if (_connections.TryGetValue(connectionId, out var connection))
{
connection.LastActivityTime = DateTime.UtcNow;
+ _logger.LogDebug("更新连接活动时间,连接ID:{ConnectionId}", connectionId);
}
}
+ ///
+ /// 入队入站消息
+ ///
public async ValueTask QueueIncomingMessage(WebSocketMessage message)
{
+ _logger.LogDebug("入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ message.ConnectionId, message.MessageType, message.Data.Length);
+
await _incomingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId);
+
+ // 触发消息到达事件
+ if (OnMessageReceived != null)
+ {
+ _logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId);
+ await OnMessageReceived(message);
+ }
}
+ ///
+ /// 入队出站消息
+ ///
public async ValueTask QueueOutgoingMessage(WebSocketMessage message)
{
+ _logger.LogDebug("入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ message.ConnectionId, message.MessageType, message.Data.Length);
+
await _outgoingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId);
}
+ ///
+ /// 读取入站消息
+ ///
public IAsyncEnumerable ReadIncomingMessagesAsync(CancellationToken cancellationToken)
{
+ _logger.LogDebug("开始读取入站消息");
return _incomingMessages.Reader.ReadAllAsync(cancellationToken);
}
+ ///
+ /// 读取出站消息
+ ///
public IAsyncEnumerable ReadOutgoingMessagesAsync(CancellationToken cancellationToken)
{
+ _logger.LogDebug("开始读取出站消息");
return _outgoingMessages.Reader.ReadAllAsync(cancellationToken);
}
+ ///
+ /// 获取所有连接
+ ///
public IEnumerable GetAllConnections()
{
+ _logger.LogDebug("获取所有连接,当前连接数:{ConnectionCount}", _connections.Count);
return _connections.Values;
}
+ ///
+ /// 检查连接状态
+ ///
private void CheckConnections(object? state)
{
var now = DateTime.UtcNow;
var inactiveThreshold = TimeSpan.FromMinutes(1);
+ int inactiveCount = 0;
foreach (var (connectionId, connection) in _connections)
{
if (connection.Status == ConnectionStatus.Connected &&
now - connection.LastActivityTime > inactiveThreshold)
{
- _logger.LogWarning("Connection {ConnectionId} is inactive, closing", connectionId);
+ _logger.LogWarning("检测到不活跃连接,连接ID:{ConnectionId},最后活动时间:{LastActivityTime}",
+ connectionId, connection.LastActivityTime);
+ inactiveCount++;
_ = CloseConnectionAsync(connectionId);
}
}
+
+ if (inactiveCount > 0)
+ {
+ _logger.LogInformation("心跳检测完成,发现 {InactiveCount} 个不活跃连接", inactiveCount);
+ }
}
+ ///
+ /// 关闭连接
+ ///
private async Task CloseConnectionAsync(string connectionId)
{
if (_connections.TryGetValue(connectionId, out var connection))
{
try
{
+ _logger.LogInformation("正在关闭连接,连接ID:{ConnectionId}", connectionId);
await connection.Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Connection timeout",
CancellationToken.None);
+ _logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId);
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error closing connection {ConnectionId}", connectionId);
+ _logger.LogError(ex, "关闭连接时发生错误,连接ID:{ConnectionId}", connectionId);
}
finally
{
@@ -140,10 +226,15 @@ public class WebSocketConnectionManager : IDisposable
public void Dispose()
{
+ _logger.LogInformation("正在释放 WebSocket 连接管理器资源");
_heartbeatTimer.Dispose();
+ _logger.LogInformation("WebSocket 连接管理器资源已释放");
}
}
+///
+/// WebSocket 连接信息
+///
public class WebSocketConnection
{
public System.Net.WebSockets.WebSocket Socket { get; set; } = null!;
@@ -151,18 +242,24 @@ public class WebSocketConnection
public ConnectionStatus Status { get; set; }
}
+///
+/// 连接状态枚举
+///
public enum ConnectionStatus
{
- Connected,
- Disconnected,
- Error
+ Connected, // 已连接
+ Disconnected, // 已断开
+ Error // 错误状态
}
+///
+/// WebSocket 消息记录
+///
public record WebSocketMessage
{
- public string ConnectionId { get; init; } = string.Empty;
- public byte[] Data { get; init; } = Array.Empty();
- public WebSocketMessageType MessageType { get; init; }
- public DateTime Timestamp { get; init; } = DateTime.UtcNow;
- public int Priority { get; init; } = 0;
+ public string ConnectionId { get; init; } = string.Empty; // 连接ID
+ public byte[] Data { get; init; } = Array.Empty(); // 消息数据
+ public WebSocketMessageType MessageType { get; init; } // 消息类型
+ public DateTime Timestamp { get; init; } = DateTime.UtcNow; // 时间戳
+ public int Priority { get; init; } = 0; // 优先级
}
\ No newline at end of file
diff --git a/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs b/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
index c67d7f7..5f8e586 100644
--- a/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
+++ b/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
@@ -5,6 +5,10 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Middleware;
+///
+/// WebSocket 中间件
+/// 负责处理 WebSocket 连接的建立和消息的接收
+///
public class WebSocketMiddleware
{
private readonly RequestDelegate _next;
@@ -20,14 +24,21 @@ public class WebSocketMiddleware
_next = next;
_connectionManager = connectionManager;
_logger = logger;
+ _logger.LogInformation("初始化 WebSocket 中间件,缓冲区大小:{BufferSize}字节", _bufferSize);
}
+ ///
+ /// 处理 HTTP 请求
+ ///
public async Task InvokeAsync(HttpContext context)
{
if (context.WebSockets.IsWebSocketRequest)
{
+ _logger.LogInformation("收到 WebSocket 连接请求,路径:{Path}", context.Request.Path);
+
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = _connectionManager.AddConnection(webSocket);
+ _logger.LogInformation("WebSocket 连接已接受,连接ID:{ConnectionId}", connectionId);
try
{
@@ -35,30 +46,41 @@ public class WebSocketMiddleware
}
catch (WebSocketException ex)
{
- _logger.LogError(ex, "WebSocket error for connection {ConnectionId}", connectionId);
+ _logger.LogError(ex, "WebSocket 连接发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
+ connectionId, ex.Message);
await HandleWebSocketError(webSocket, ex);
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error handling WebSocket connection {ConnectionId}", connectionId);
+ _logger.LogError(ex, "处理 WebSocket 连接时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
+ connectionId, ex.Message);
await HandleWebSocketError(webSocket, ex);
}
finally
{
_connectionManager.RemoveConnection(connectionId);
+ _logger.LogInformation("WebSocket 连接已关闭,连接ID:{ConnectionId}", connectionId);
}
}
else
{
+ _logger.LogDebug("非 WebSocket 请求,继续处理下一个中间件,路径:{Path}", context.Request.Path);
await _next(context);
}
}
+ ///
+ /// 处理 WebSocket 连接
+ ///
private async Task HandleWebSocketConnection(System.Net.WebSockets.WebSocket webSocket, string connectionId)
{
+ _logger.LogInformation("开始处理 WebSocket 连接,连接ID:{ConnectionId}", connectionId);
+
var buffer = new byte[_bufferSize];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment(buffer), CancellationToken.None);
+ _logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ connectionId, receiveResult.MessageType, receiveResult.Count);
while (!receiveResult.CloseStatus.HasValue)
{
@@ -71,39 +93,57 @@ public class WebSocketMiddleware
MessageType = receiveResult.MessageType
};
+ _logger.LogDebug("准备处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ connectionId, message.MessageType, message.Data.Length);
+
await _connectionManager.QueueIncomingMessage(message);
+ _logger.LogDebug("消息已入队,连接ID:{ConnectionId}", connectionId);
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment(buffer), CancellationToken.None);
+ _logger.LogDebug("收到新消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ connectionId, receiveResult.MessageType, receiveResult.Count);
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error processing message for connection {ConnectionId}", connectionId);
+ _logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
+ connectionId, ex.Message);
throw;
}
}
+ _logger.LogInformation("收到关闭请求,连接ID:{ConnectionId},关闭状态:{CloseStatus},描述:{CloseDescription}",
+ connectionId, receiveResult.CloseStatus, receiveResult.CloseStatusDescription);
+
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
+ _logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId);
}
+ ///
+ /// 处理 WebSocket 错误
+ ///
private async Task HandleWebSocketError(System.Net.WebSockets.WebSocket webSocket, Exception exception)
{
+ _logger.LogError(exception, "处理 WebSocket 错误,错误信息:{ErrorMessage}", exception.Message);
+
try
{
if (webSocket.State == WebSocketState.Open)
{
+ _logger.LogInformation("正在关闭出错的 WebSocket 连接");
await webSocket.CloseAsync(
WebSocketCloseStatus.InternalServerError,
"Internal server error",
CancellationToken.None);
+ _logger.LogInformation("WebSocket 连接已关闭");
}
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error closing WebSocket after error");
+ _logger.LogError(ex, "关闭 WebSocket 连接时发生错误,错误信息:{ErrorMessage}", ex.Message);
}
}
}
\ No newline at end of file
diff --git a/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs b/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs
index 434047d..ae1d8df 100644
--- a/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs
+++ b/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs
@@ -1,8 +1,29 @@
+using Microsoft.Extensions.Logging;
+
namespace CellularManagement.WebSocket.Pipeline;
+///
+/// 管道步骤接口
+/// 定义消息处理管道中的单个处理步骤
+///
+/// 输入消息类型
+/// 输出消息类型
public interface IPipelineStep
{
+ ///
+ /// 处理消息
+ ///
+ /// 输入消息
+ /// 取消令牌
+ /// 处理后的消息
Task ProcessAsync(TInput input, CancellationToken cancellationToken);
+
+ ///
+ /// 处理消息(带错误处理)
+ ///
+ /// 输入消息
+ /// 取消令牌
+ /// 处理后的消息
Task ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken);
}
diff --git a/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs b/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs
index 5419192..c155bed 100644
--- a/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs
+++ b/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs
@@ -3,33 +3,117 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline;
+///
+/// 管道构建器
+/// 用于构建消息处理管道
+///
+/// 输入消息类型
+/// 输出消息类型
public class PipelineBuilder
{
- private readonly List> _steps = new();
private readonly ILoggerFactory _loggerFactory;
+ private readonly ILogger> _logger;
+ private readonly List> _steps = new();
public PipelineBuilder(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
+ _logger = loggerFactory.CreateLogger>();
+ _logger.LogInformation("初始化管道构建器,输入类型:{InputType},输出类型:{OutputType}",
+ typeof(TInput).Name, typeof(TOutput).Name);
}
+ ///
+ /// 添加处理步骤
+ ///
+ /// 处理步骤
+ /// 管道构建器
public PipelineBuilder AddStep(IPipelineStep step)
{
- if (step == null)
- {
- throw new ArgumentNullException(nameof(step));
- }
+ _logger.LogInformation("添加处理步骤,步骤类型:{StepType}", step.GetType().Name);
_steps.Add(step);
return this;
}
+ ///
+ /// 构建处理管道
+ ///
+ /// 处理管道
public IPipelineStep Build()
{
+ _logger.LogInformation("开始构建处理管道,步骤数量:{StepCount}", _steps.Count);
+
if (_steps.Count == 0)
{
- throw new InvalidOperationException("Pipeline must have at least one step");
+ _logger.LogWarning("处理管道没有添加任何步骤");
+ throw new InvalidOperationException("处理管道必须至少包含一个步骤");
+ }
+
+ // 构建处理链
+ var pipeline = _steps[0];
+ for (int i = 1; i < _steps.Count; i++)
+ {
+ var currentStep = _steps[i];
+ _logger.LogDebug("连接处理步骤:{CurrentStep} -> {NextStep}",
+ pipeline.GetType().Name, currentStep.GetType().Name);
+ pipeline = new ChainedPipelineStep(pipeline, currentStep, _loggerFactory);
+ }
+
+ _logger.LogInformation("处理管道构建完成,总步骤数:{StepCount}", _steps.Count);
+ return pipeline;
+ }
+
+ ///
+ /// 链式处理步骤
+ /// 用于连接多个处理步骤
+ ///
+ private class ChainedPipelineStep : IPipelineStep
+ {
+ private readonly IPipelineStep _first;
+ private readonly IPipelineStep _second;
+ private readonly ILogger _logger;
+
+ public ChainedPipelineStep(
+ IPipelineStep first,
+ IPipelineStep second,
+ ILoggerFactory loggerFactory)
+ {
+ _first = first;
+ _second = second;
+ _logger = loggerFactory.CreateLogger();
+ _logger.LogDebug("创建链式处理步骤,第一步:{FirstStep},第二步:{SecondStep}",
+ first.GetType().Name, second.GetType().Name);
+ }
+
+ public async Task ProcessAsync(TInput input, CancellationToken cancellationToken)
+ {
+ _logger.LogDebug("开始链式处理消息");
+ var intermediate = await _first.ProcessAsync(input, cancellationToken);
+ if (intermediate is TInput inputValue)
+ {
+ return await _second.ProcessAsync(inputValue, cancellationToken);
+ }
+ throw new InvalidOperationException($"无法将类型 {intermediate?.GetType().Name ?? "null"} 转换为 {typeof(TInput).Name}");
+ }
+
+ public async Task ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken)
+ {
+ _logger.LogDebug("开始链式处理消息(带错误处理)");
+ try
+ {
+ var intermediate = await _first.ProcessWithErrorHandlingAsync(input, cancellationToken);
+ if (intermediate is TInput inputValue)
+ {
+ return await _second.ProcessWithErrorHandlingAsync(inputValue, cancellationToken);
+ }
+ throw new InvalidOperationException($"无法将类型 {intermediate?.GetType().Name ?? "null"} 转换为 {typeof(TInput).Name}");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "链式处理发生错误");
+ throw;
+ }
}
- return new Pipeline(_steps, _loggerFactory.CreateLogger>());
}
}
@@ -40,22 +124,29 @@ public class Pipeline : IPipelineStep
public Pipeline(IReadOnlyList> steps, ILogger> logger)
{
- _steps = steps;
- _logger = logger;
+ _steps = steps ?? throw new ArgumentNullException(nameof(steps));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task ProcessAsync(TInput input, CancellationToken cancellationToken)
{
+ TOutput? result = default;
var current = input;
- TOutput output = default!;
foreach (var step in _steps)
{
- output = await step.ProcessAsync(current, cancellationToken);
- current = (TInput)(object)output!;
+ result = await step.ProcessAsync(current, cancellationToken);
+ if (result is TInput nextInput)
+ {
+ current = nextInput;
+ }
+ else
+ {
+ break;
+ }
}
- return output;
+ return result ?? throw new InvalidOperationException("管道处理结果不能为 null");
}
public async Task ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken)
@@ -66,8 +157,8 @@ public class Pipeline : IPipelineStep
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error in pipeline processing");
- throw new PipelineException("Pipeline processing failed", ex);
+ _logger.LogError(ex, "管道处理发生错误");
+ throw new PipelineException("管道处理失败", ex);
}
}
}
\ No newline at end of file
diff --git a/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs b/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs
index f215798..e1ef525 100644
--- a/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs
+++ b/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs
@@ -4,6 +4,10 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline.Steps;
+///
+/// 消息路由步骤
+/// 负责根据消息类型将消息路由到相应的处理器
+///
public class MessageRoutingStep : IPipelineStep
{
private readonly ILogger _logger;
@@ -19,19 +23,35 @@ public class MessageRoutingStep : IPipelineStep
+ /// 注册消息处理器
+ ///
+ /// 消息类型
+ /// 消息处理器
public void RegisterHandler(string messageType, Func> handler)
{
+ _logger.LogInformation("注册消息处理器,消息类型:{MessageType},处理器类型:{HandlerType}",
+ messageType, handler.Method.DeclaringType?.Name);
+
if (!_handlers.ContainsKey(messageType))
{
_handlers[messageType] = new List>>();
}
_handlers[messageType].Add(handler);
+ _logger.LogDebug("消息处理器注册完成,当前处理器数量:{HandlerCount}", _handlers[messageType].Count);
}
+ ///
+ /// 处理消息
+ ///
public async Task ProcessAsync(WebSocketMessage input, CancellationToken cancellationToken)
{
+ _logger.LogDebug("开始路由消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ input.ConnectionId, input.MessageType, input.Data.Length);
+
try
{
var messageText = System.Text.Encoding.UTF8.GetString(input.Data);
@@ -39,48 +59,63 @@ public class MessageRoutingStep : IPipelineStep
+ /// 处理消息(带错误处理)
+ ///
public async Task ProcessWithErrorHandlingAsync(WebSocketMessage input, CancellationToken cancellationToken)
{
+ _logger.LogDebug("开始路由消息(带错误处理),连接ID:{ConnectionId}", input.ConnectionId);
try
{
- return await ProcessAsync(input, cancellationToken);
+ var result = await ProcessAsync(input, cancellationToken);
+ _logger.LogDebug("消息路由完成(带错误处理),连接ID:{ConnectionId}", input.ConnectionId);
+ return result;
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error in message routing for connection {ConnectionId}", input.ConnectionId);
- throw new PipelineException("Message routing failed", ex);
+ _logger.LogError(ex, "消息路由失败,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
+ input.ConnectionId, ex.Message);
+ throw;
}
}
-
- private Func> GetNextHandler(List>> handlers)
- {
- // 使用轮询策略选择处理器
- var index = Interlocked.Increment(ref _currentHandlerIndex) % handlers.Count;
- return handlers[index];
- }
}
\ No newline at end of file
diff --git a/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs b/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs
index 9af6bb9..5c67e39 100644
--- a/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs
+++ b/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs
@@ -7,6 +7,10 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services;
+///
+/// WebSocket 消息服务
+/// 负责处理 WebSocket 消息的接收、处理和发送
+///
public class WebSocketMessageService : BackgroundService
{
private readonly WebSocketConnectionManager _connectionManager;
@@ -23,8 +27,11 @@ public class WebSocketMessageService : BackgroundService
{
_connectionManager = connectionManager;
_logger = logger;
+ _logger.LogInformation("初始化 WebSocket 消息服务");
+
_routingStep = new MessageRoutingStep(loggerFactory.CreateLogger());
_processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses);
+ _logger.LogInformation("创建消息处理信号量,最大并发数:{MaxConcurrentProcesses}", _maxConcurrentProcesses);
// 构建处理管道
var pipelineBuilder = new PipelineBuilder(loggerFactory);
@@ -32,53 +39,115 @@ public class WebSocketMessageService : BackgroundService
.AddStep(new MessageValidationStep(loggerFactory.CreateLogger()))
.AddStep(_routingStep)
.Build();
+ _logger.LogInformation("消息处理管道构建完成");
+
+ // 订阅消息到达事件
+ _connectionManager.OnMessageReceived += ProcessMessageAsync;
+ _logger.LogInformation("已订阅消息到达事件");
}
+ ///
+ /// 注册消息处理器
+ ///
public void RegisterMessageHandler(string messageType, Func> handler)
{
+ _logger.LogInformation("注册消息处理器,消息类型:{MessageType}", messageType);
_routingStep.RegisterHandler(messageType, handler);
}
+ ///
+ /// 执行后台服务
+ /// 负责处理出站消息的发送
+ ///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
+ _logger.LogInformation("WebSocket 消息服务开始运行");
try
{
- await foreach (var message in _connectionManager.ReadIncomingMessagesAsync(stoppingToken))
+ // 处理出站消息
+ await foreach (var message in _connectionManager.ReadOutgoingMessagesAsync(stoppingToken))
{
- await ProcessMessageAsync(message, stoppingToken);
+ try
+ {
+ _logger.LogDebug("开始处理出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ message.ConnectionId, message.MessageType, message.Data.Length);
+
+ var connection = _connectionManager.GetConnection(message.ConnectionId);
+ if (connection?.Socket.State == System.Net.WebSockets.WebSocketState.Open)
+ {
+ await connection.Socket.SendAsync(
+ new ArraySegment(message.Data),
+ message.MessageType,
+ true,
+ stoppingToken);
+ _logger.LogDebug("消息发送成功,连接ID:{ConnectionId}", message.ConnectionId);
+ }
+ else
+ {
+ _logger.LogWarning("消息发送失败,连接不存在或已关闭,连接ID:{ConnectionId}", message.ConnectionId);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "发送消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
+ }
}
}
catch (OperationCanceledException)
{
- _logger.LogInformation("Message processing service is stopping");
+ _logger.LogInformation("WebSocket 消息服务正在停止");
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error in message processing service");
+ _logger.LogError(ex, "WebSocket 消息服务发生错误");
+ }
+ finally
+ {
+ _logger.LogInformation("WebSocket 消息服务已停止");
}
}
- private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken)
+ ///
+ /// 处理消息
+ ///
+ private async Task ProcessMessageAsync(WebSocketMessage message)
{
- await _processingSemaphore.WaitAsync(cancellationToken);
+ _logger.LogDebug("开始处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
+ message.ConnectionId, message.MessageType, message.Data.Length);
+
+ await _processingSemaphore.WaitAsync();
try
{
- var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, cancellationToken);
- await _connectionManager.QueueOutgoingMessage(processedMessage);
+ var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, CancellationToken.None);
+ _logger.LogDebug("消息处理完成,连接ID:{ConnectionId},处理结果:{Processed}",
+ message.ConnectionId, processedMessage != null);
+
+ if (processedMessage != null)
+ {
+ await _connectionManager.QueueOutgoingMessage(processedMessage);
+ _logger.LogDebug("处理后的消息已入队,连接ID:{ConnectionId}", message.ConnectionId);
+ }
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error processing message for connection {ConnectionId}", message.ConnectionId);
+ _logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
await HandleProcessingError(message, ex);
}
finally
{
_processingSemaphore.Release();
+ _logger.LogDebug("消息处理完成,释放信号量,连接ID:{ConnectionId}", message.ConnectionId);
}
}
+ ///
+ /// 处理错误
+ ///
private async Task HandleProcessingError(WebSocketMessage message, Exception exception)
{
+ _logger.LogError(exception, "处理消息错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
+ message.ConnectionId, exception.Message);
+
try
{
var errorResponse = new WebSocketMessage
@@ -93,16 +162,21 @@ public class WebSocketMessageService : BackgroundService
};
await _connectionManager.QueueOutgoingMessage(errorResponse);
+ _logger.LogDebug("错误响应已发送,连接ID:{ConnectionId}", message.ConnectionId);
}
catch (Exception ex)
{
- _logger.LogError(ex, "Error sending error response for connection {ConnectionId}", message.ConnectionId);
+ _logger.LogError(ex, "发送错误响应时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
}
public override void Dispose()
{
+ _logger.LogInformation("正在释放 WebSocket 消息服务资源");
+ // 取消订阅事件
+ _connectionManager.OnMessageReceived -= ProcessMessageAsync;
_processingSemaphore.Dispose();
base.Dispose();
+ _logger.LogInformation("WebSocket 消息服务资源已释放");
}
}
\ No newline at end of file