From da9345dbad6154701046a029cc4c4302635c329c Mon Sep 17 00:00:00 2001 From: hyh Date: Wed, 30 Apr 2025 17:37:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20PipelineBuilder=20?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E7=B1=BB=E5=9E=8B=E8=BD=AC=E6=8D=A2=E5=92=8C?= =?UTF-8?q?=20Logger=20=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Connection/WebSocketConnectionManager.cs | 127 +++++++++++++++--- .../Middleware/WebSocketMiddleware.cs | 48 ++++++- .../Pipeline/IPipelineStep.cs | 21 +++ .../Pipeline/PipelineBuilder.cs | 121 ++++++++++++++--- .../Pipeline/Steps/MessageRoutingStep.cs | 75 ++++++++--- .../Services/WebSocketMessageService.cs | 94 +++++++++++-- 6 files changed, 422 insertions(+), 64 deletions(-) diff --git a/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs b/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs index e91dc99..5999a46 100644 --- a/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs +++ b/src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs @@ -5,23 +5,37 @@ using Microsoft.Extensions.Logging; namespace CellularManagement.WebSocket.Connection; +/// +/// WebSocket 连接管理器 +/// 负责管理所有 WebSocket 连接,处理消息的入队和出队 +/// public class WebSocketConnectionManager : IDisposable { + // 存储所有活动的 WebSocket 连接 private readonly ConcurrentDictionary _connections = new(); + // 入站消息队列 private readonly Channel _incomingMessages; + // 出站消息队列 private readonly Channel _outgoingMessages; private readonly ILogger _logger; + // 心跳检测定时器 private readonly Timer _heartbeatTimer; private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(30); + // 消息到达事件 + public event Func? OnMessageReceived; + public WebSocketConnectionManager(ILogger logger) { _logger = logger; + _logger.LogInformation("初始化 WebSocket 连接管理器"); + + // 创建有界通道,限制最大消息数量为 10000 _incomingMessages = Channel.CreateBounded(new BoundedChannelOptions(10000) { - FullMode = BoundedChannelFullMode.Wait, - SingleReader = false, - SingleWriter = false + FullMode = BoundedChannelFullMode.Wait, // 当队列满时等待 + SingleReader = false, // 允许多个读取者 + SingleWriter = false // 允许多个写入者 }); _outgoingMessages = Channel.CreateBounded(new BoundedChannelOptions(10000) @@ -31,9 +45,17 @@ public class WebSocketConnectionManager : IDisposable SingleWriter = false }); + _logger.LogInformation("创建消息队列完成,入站队列大小:{IncomingSize},出站队列大小:{OutgoingSize}", + 10000, 10000); + + // 启动心跳检测定时器 _heartbeatTimer = new Timer(CheckConnections, null, _heartbeatInterval, _heartbeatInterval); + _logger.LogInformation("心跳检测定时器已启动,间隔:{Interval}秒", _heartbeatInterval.TotalSeconds); } + /// + /// 添加新的 WebSocket 连接 + /// public string AddConnection(System.Net.WebSockets.WebSocket socket) { var connectionId = Guid.NewGuid().ToString(); @@ -45,91 +67,155 @@ public class WebSocketConnectionManager : IDisposable }; _connections.TryAdd(connectionId, connection); - _logger.LogInformation("New connection added: {ConnectionId}", connectionId); + _logger.LogInformation("添加新连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}", + connectionId, _connections.Count); return connectionId; } + /// + /// 移除 WebSocket 连接 + /// public bool RemoveConnection(string connectionId) { if (_connections.TryRemove(connectionId, out var connection)) { - _logger.LogInformation("Connection removed: {ConnectionId}", connectionId); + _logger.LogInformation("移除连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}", + connectionId, _connections.Count); return true; } + _logger.LogWarning("移除连接失败,连接ID:{ConnectionId} 不存在", connectionId); return false; } + /// + /// 获取指定连接 + /// public WebSocketConnection? GetConnection(string connectionId) { _connections.TryGetValue(connectionId, out var connection); + if (connection == null) + { + _logger.LogWarning("获取连接失败,连接ID:{ConnectionId} 不存在", connectionId); + } return connection; } + /// + /// 更新连接活动时间 + /// public void UpdateConnectionActivity(string connectionId) { if (_connections.TryGetValue(connectionId, out var connection)) { connection.LastActivityTime = DateTime.UtcNow; + _logger.LogDebug("更新连接活动时间,连接ID:{ConnectionId}", connectionId); } } + /// + /// 入队入站消息 + /// public async ValueTask QueueIncomingMessage(WebSocketMessage message) { + _logger.LogDebug("入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + message.ConnectionId, message.MessageType, message.Data.Length); + await _incomingMessages.Writer.WriteAsync(message); UpdateConnectionActivity(message.ConnectionId); + + // 触发消息到达事件 + if (OnMessageReceived != null) + { + _logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId); + await OnMessageReceived(message); + } } + /// + /// 入队出站消息 + /// public async ValueTask QueueOutgoingMessage(WebSocketMessage message) { + _logger.LogDebug("入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + message.ConnectionId, message.MessageType, message.Data.Length); + await _outgoingMessages.Writer.WriteAsync(message); UpdateConnectionActivity(message.ConnectionId); } + /// + /// 读取入站消息 + /// public IAsyncEnumerable ReadIncomingMessagesAsync(CancellationToken cancellationToken) { + _logger.LogDebug("开始读取入站消息"); return _incomingMessages.Reader.ReadAllAsync(cancellationToken); } + /// + /// 读取出站消息 + /// public IAsyncEnumerable ReadOutgoingMessagesAsync(CancellationToken cancellationToken) { + _logger.LogDebug("开始读取出站消息"); return _outgoingMessages.Reader.ReadAllAsync(cancellationToken); } + /// + /// 获取所有连接 + /// public IEnumerable GetAllConnections() { + _logger.LogDebug("获取所有连接,当前连接数:{ConnectionCount}", _connections.Count); return _connections.Values; } + /// + /// 检查连接状态 + /// private void CheckConnections(object? state) { var now = DateTime.UtcNow; var inactiveThreshold = TimeSpan.FromMinutes(1); + int inactiveCount = 0; foreach (var (connectionId, connection) in _connections) { if (connection.Status == ConnectionStatus.Connected && now - connection.LastActivityTime > inactiveThreshold) { - _logger.LogWarning("Connection {ConnectionId} is inactive, closing", connectionId); + _logger.LogWarning("检测到不活跃连接,连接ID:{ConnectionId},最后活动时间:{LastActivityTime}", + connectionId, connection.LastActivityTime); + inactiveCount++; _ = CloseConnectionAsync(connectionId); } } + + if (inactiveCount > 0) + { + _logger.LogInformation("心跳检测完成,发现 {InactiveCount} 个不活跃连接", inactiveCount); + } } + /// + /// 关闭连接 + /// private async Task CloseConnectionAsync(string connectionId) { if (_connections.TryGetValue(connectionId, out var connection)) { try { + _logger.LogInformation("正在关闭连接,连接ID:{ConnectionId}", connectionId); await connection.Socket.CloseAsync( WebSocketCloseStatus.NormalClosure, "Connection timeout", CancellationToken.None); + _logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId); } catch (Exception ex) { - _logger.LogError(ex, "Error closing connection {ConnectionId}", connectionId); + _logger.LogError(ex, "关闭连接时发生错误,连接ID:{ConnectionId}", connectionId); } finally { @@ -140,10 +226,15 @@ public class WebSocketConnectionManager : IDisposable public void Dispose() { + _logger.LogInformation("正在释放 WebSocket 连接管理器资源"); _heartbeatTimer.Dispose(); + _logger.LogInformation("WebSocket 连接管理器资源已释放"); } } +/// +/// WebSocket 连接信息 +/// public class WebSocketConnection { public System.Net.WebSockets.WebSocket Socket { get; set; } = null!; @@ -151,18 +242,24 @@ public class WebSocketConnection public ConnectionStatus Status { get; set; } } +/// +/// 连接状态枚举 +/// public enum ConnectionStatus { - Connected, - Disconnected, - Error + Connected, // 已连接 + Disconnected, // 已断开 + Error // 错误状态 } +/// +/// WebSocket 消息记录 +/// public record WebSocketMessage { - public string ConnectionId { get; init; } = string.Empty; - public byte[] Data { get; init; } = Array.Empty(); - public WebSocketMessageType MessageType { get; init; } - public DateTime Timestamp { get; init; } = DateTime.UtcNow; - public int Priority { get; init; } = 0; + public string ConnectionId { get; init; } = string.Empty; // 连接ID + public byte[] Data { get; init; } = Array.Empty(); // 消息数据 + public WebSocketMessageType MessageType { get; init; } // 消息类型 + public DateTime Timestamp { get; init; } = DateTime.UtcNow; // 时间戳 + public int Priority { get; init; } = 0; // 优先级 } \ No newline at end of file diff --git a/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs b/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs index c67d7f7..5f8e586 100644 --- a/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs +++ b/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs @@ -5,6 +5,10 @@ using Microsoft.Extensions.Logging; namespace CellularManagement.WebSocket.Middleware; +/// +/// WebSocket 中间件 +/// 负责处理 WebSocket 连接的建立和消息的接收 +/// public class WebSocketMiddleware { private readonly RequestDelegate _next; @@ -20,14 +24,21 @@ public class WebSocketMiddleware _next = next; _connectionManager = connectionManager; _logger = logger; + _logger.LogInformation("初始化 WebSocket 中间件,缓冲区大小:{BufferSize}字节", _bufferSize); } + /// + /// 处理 HTTP 请求 + /// public async Task InvokeAsync(HttpContext context) { if (context.WebSockets.IsWebSocketRequest) { + _logger.LogInformation("收到 WebSocket 连接请求,路径:{Path}", context.Request.Path); + var webSocket = await context.WebSockets.AcceptWebSocketAsync(); var connectionId = _connectionManager.AddConnection(webSocket); + _logger.LogInformation("WebSocket 连接已接受,连接ID:{ConnectionId}", connectionId); try { @@ -35,30 +46,41 @@ public class WebSocketMiddleware } catch (WebSocketException ex) { - _logger.LogError(ex, "WebSocket error for connection {ConnectionId}", connectionId); + _logger.LogError(ex, "WebSocket 连接发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}", + connectionId, ex.Message); await HandleWebSocketError(webSocket, ex); } catch (Exception ex) { - _logger.LogError(ex, "Error handling WebSocket connection {ConnectionId}", connectionId); + _logger.LogError(ex, "处理 WebSocket 连接时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}", + connectionId, ex.Message); await HandleWebSocketError(webSocket, ex); } finally { _connectionManager.RemoveConnection(connectionId); + _logger.LogInformation("WebSocket 连接已关闭,连接ID:{ConnectionId}", connectionId); } } else { + _logger.LogDebug("非 WebSocket 请求,继续处理下一个中间件,路径:{Path}", context.Request.Path); await _next(context); } } + /// + /// 处理 WebSocket 连接 + /// private async Task HandleWebSocketConnection(System.Net.WebSockets.WebSocket webSocket, string connectionId) { + _logger.LogInformation("开始处理 WebSocket 连接,连接ID:{ConnectionId}", connectionId); + var buffer = new byte[_bufferSize]; var receiveResult = await webSocket.ReceiveAsync( new ArraySegment(buffer), CancellationToken.None); + _logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + connectionId, receiveResult.MessageType, receiveResult.Count); while (!receiveResult.CloseStatus.HasValue) { @@ -71,39 +93,57 @@ public class WebSocketMiddleware MessageType = receiveResult.MessageType }; + _logger.LogDebug("准备处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + connectionId, message.MessageType, message.Data.Length); + await _connectionManager.QueueIncomingMessage(message); + _logger.LogDebug("消息已入队,连接ID:{ConnectionId}", connectionId); receiveResult = await webSocket.ReceiveAsync( new ArraySegment(buffer), CancellationToken.None); + _logger.LogDebug("收到新消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + connectionId, receiveResult.MessageType, receiveResult.Count); } catch (Exception ex) { - _logger.LogError(ex, "Error processing message for connection {ConnectionId}", connectionId); + _logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}", + connectionId, ex.Message); throw; } } + _logger.LogInformation("收到关闭请求,连接ID:{ConnectionId},关闭状态:{CloseStatus},描述:{CloseDescription}", + connectionId, receiveResult.CloseStatus, receiveResult.CloseStatusDescription); + await webSocket.CloseAsync( receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None); + _logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId); } + /// + /// 处理 WebSocket 错误 + /// private async Task HandleWebSocketError(System.Net.WebSockets.WebSocket webSocket, Exception exception) { + _logger.LogError(exception, "处理 WebSocket 错误,错误信息:{ErrorMessage}", exception.Message); + try { if (webSocket.State == WebSocketState.Open) { + _logger.LogInformation("正在关闭出错的 WebSocket 连接"); await webSocket.CloseAsync( WebSocketCloseStatus.InternalServerError, "Internal server error", CancellationToken.None); + _logger.LogInformation("WebSocket 连接已关闭"); } } catch (Exception ex) { - _logger.LogError(ex, "Error closing WebSocket after error"); + _logger.LogError(ex, "关闭 WebSocket 连接时发生错误,错误信息:{ErrorMessage}", ex.Message); } } } \ No newline at end of file diff --git a/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs b/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs index 434047d..ae1d8df 100644 --- a/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs +++ b/src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs @@ -1,8 +1,29 @@ +using Microsoft.Extensions.Logging; + namespace CellularManagement.WebSocket.Pipeline; +/// +/// 管道步骤接口 +/// 定义消息处理管道中的单个处理步骤 +/// +/// 输入消息类型 +/// 输出消息类型 public interface IPipelineStep { + /// + /// 处理消息 + /// + /// 输入消息 + /// 取消令牌 + /// 处理后的消息 Task ProcessAsync(TInput input, CancellationToken cancellationToken); + + /// + /// 处理消息(带错误处理) + /// + /// 输入消息 + /// 取消令牌 + /// 处理后的消息 Task ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken); } diff --git a/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs b/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs index 5419192..c155bed 100644 --- a/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs +++ b/src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs @@ -3,33 +3,117 @@ using Microsoft.Extensions.Logging; namespace CellularManagement.WebSocket.Pipeline; +/// +/// 管道构建器 +/// 用于构建消息处理管道 +/// +/// 输入消息类型 +/// 输出消息类型 public class PipelineBuilder { - private readonly List> _steps = new(); private readonly ILoggerFactory _loggerFactory; + private readonly ILogger> _logger; + private readonly List> _steps = new(); public PipelineBuilder(ILoggerFactory loggerFactory) { _loggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger>(); + _logger.LogInformation("初始化管道构建器,输入类型:{InputType},输出类型:{OutputType}", + typeof(TInput).Name, typeof(TOutput).Name); } + /// + /// 添加处理步骤 + /// + /// 处理步骤 + /// 管道构建器 public PipelineBuilder AddStep(IPipelineStep step) { - if (step == null) - { - throw new ArgumentNullException(nameof(step)); - } + _logger.LogInformation("添加处理步骤,步骤类型:{StepType}", step.GetType().Name); _steps.Add(step); return this; } + /// + /// 构建处理管道 + /// + /// 处理管道 public IPipelineStep Build() { + _logger.LogInformation("开始构建处理管道,步骤数量:{StepCount}", _steps.Count); + if (_steps.Count == 0) { - throw new InvalidOperationException("Pipeline must have at least one step"); + _logger.LogWarning("处理管道没有添加任何步骤"); + throw new InvalidOperationException("处理管道必须至少包含一个步骤"); + } + + // 构建处理链 + var pipeline = _steps[0]; + for (int i = 1; i < _steps.Count; i++) + { + var currentStep = _steps[i]; + _logger.LogDebug("连接处理步骤:{CurrentStep} -> {NextStep}", + pipeline.GetType().Name, currentStep.GetType().Name); + pipeline = new ChainedPipelineStep(pipeline, currentStep, _loggerFactory); + } + + _logger.LogInformation("处理管道构建完成,总步骤数:{StepCount}", _steps.Count); + return pipeline; + } + + /// + /// 链式处理步骤 + /// 用于连接多个处理步骤 + /// + private class ChainedPipelineStep : IPipelineStep + { + private readonly IPipelineStep _first; + private readonly IPipelineStep _second; + private readonly ILogger _logger; + + public ChainedPipelineStep( + IPipelineStep first, + IPipelineStep second, + ILoggerFactory loggerFactory) + { + _first = first; + _second = second; + _logger = loggerFactory.CreateLogger(); + _logger.LogDebug("创建链式处理步骤,第一步:{FirstStep},第二步:{SecondStep}", + first.GetType().Name, second.GetType().Name); + } + + public async Task ProcessAsync(TInput input, CancellationToken cancellationToken) + { + _logger.LogDebug("开始链式处理消息"); + var intermediate = await _first.ProcessAsync(input, cancellationToken); + if (intermediate is TInput inputValue) + { + return await _second.ProcessAsync(inputValue, cancellationToken); + } + throw new InvalidOperationException($"无法将类型 {intermediate?.GetType().Name ?? "null"} 转换为 {typeof(TInput).Name}"); + } + + public async Task ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken) + { + _logger.LogDebug("开始链式处理消息(带错误处理)"); + try + { + var intermediate = await _first.ProcessWithErrorHandlingAsync(input, cancellationToken); + if (intermediate is TInput inputValue) + { + return await _second.ProcessWithErrorHandlingAsync(inputValue, cancellationToken); + } + throw new InvalidOperationException($"无法将类型 {intermediate?.GetType().Name ?? "null"} 转换为 {typeof(TInput).Name}"); + } + catch (Exception ex) + { + _logger.LogError(ex, "链式处理发生错误"); + throw; + } } - return new Pipeline(_steps, _loggerFactory.CreateLogger>()); } } @@ -40,22 +124,29 @@ public class Pipeline : IPipelineStep public Pipeline(IReadOnlyList> steps, ILogger> logger) { - _steps = steps; - _logger = logger; + _steps = steps ?? throw new ArgumentNullException(nameof(steps)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task ProcessAsync(TInput input, CancellationToken cancellationToken) { + TOutput? result = default; var current = input; - TOutput output = default!; foreach (var step in _steps) { - output = await step.ProcessAsync(current, cancellationToken); - current = (TInput)(object)output!; + result = await step.ProcessAsync(current, cancellationToken); + if (result is TInput nextInput) + { + current = nextInput; + } + else + { + break; + } } - return output; + return result ?? throw new InvalidOperationException("管道处理结果不能为 null"); } public async Task ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken) @@ -66,8 +157,8 @@ public class Pipeline : IPipelineStep } catch (Exception ex) { - _logger.LogError(ex, "Error in pipeline processing"); - throw new PipelineException("Pipeline processing failed", ex); + _logger.LogError(ex, "管道处理发生错误"); + throw new PipelineException("管道处理失败", ex); } } } \ No newline at end of file diff --git a/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs b/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs index f215798..e1ef525 100644 --- a/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs +++ b/src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs @@ -4,6 +4,10 @@ using Microsoft.Extensions.Logging; namespace CellularManagement.WebSocket.Pipeline.Steps; +/// +/// 消息路由步骤 +/// 负责根据消息类型将消息路由到相应的处理器 +/// public class MessageRoutingStep : IPipelineStep { private readonly ILogger _logger; @@ -19,19 +23,35 @@ public class MessageRoutingStep : IPipelineStep + /// 注册消息处理器 + /// + /// 消息类型 + /// 消息处理器 public void RegisterHandler(string messageType, Func> handler) { + _logger.LogInformation("注册消息处理器,消息类型:{MessageType},处理器类型:{HandlerType}", + messageType, handler.Method.DeclaringType?.Name); + if (!_handlers.ContainsKey(messageType)) { _handlers[messageType] = new List>>(); } _handlers[messageType].Add(handler); + _logger.LogDebug("消息处理器注册完成,当前处理器数量:{HandlerCount}", _handlers[messageType].Count); } + /// + /// 处理消息 + /// public async Task ProcessAsync(WebSocketMessage input, CancellationToken cancellationToken) { + _logger.LogDebug("开始路由消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + input.ConnectionId, input.MessageType, input.Data.Length); + try { var messageText = System.Text.Encoding.UTF8.GetString(input.Data); @@ -39,48 +59,63 @@ public class MessageRoutingStep : IPipelineStep + /// 处理消息(带错误处理) + /// public async Task ProcessWithErrorHandlingAsync(WebSocketMessage input, CancellationToken cancellationToken) { + _logger.LogDebug("开始路由消息(带错误处理),连接ID:{ConnectionId}", input.ConnectionId); try { - return await ProcessAsync(input, cancellationToken); + var result = await ProcessAsync(input, cancellationToken); + _logger.LogDebug("消息路由完成(带错误处理),连接ID:{ConnectionId}", input.ConnectionId); + return result; } catch (Exception ex) { - _logger.LogError(ex, "Error in message routing for connection {ConnectionId}", input.ConnectionId); - throw new PipelineException("Message routing failed", ex); + _logger.LogError(ex, "消息路由失败,连接ID:{ConnectionId},错误信息:{ErrorMessage}", + input.ConnectionId, ex.Message); + throw; } } - - private Func> GetNextHandler(List>> handlers) - { - // 使用轮询策略选择处理器 - var index = Interlocked.Increment(ref _currentHandlerIndex) % handlers.Count; - return handlers[index]; - } } \ No newline at end of file diff --git a/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs b/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs index 9af6bb9..5c67e39 100644 --- a/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs +++ b/src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs @@ -7,6 +7,10 @@ using Microsoft.Extensions.Logging; namespace CellularManagement.WebSocket.Services; +/// +/// WebSocket 消息服务 +/// 负责处理 WebSocket 消息的接收、处理和发送 +/// public class WebSocketMessageService : BackgroundService { private readonly WebSocketConnectionManager _connectionManager; @@ -23,8 +27,11 @@ public class WebSocketMessageService : BackgroundService { _connectionManager = connectionManager; _logger = logger; + _logger.LogInformation("初始化 WebSocket 消息服务"); + _routingStep = new MessageRoutingStep(loggerFactory.CreateLogger()); _processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses); + _logger.LogInformation("创建消息处理信号量,最大并发数:{MaxConcurrentProcesses}", _maxConcurrentProcesses); // 构建处理管道 var pipelineBuilder = new PipelineBuilder(loggerFactory); @@ -32,53 +39,115 @@ public class WebSocketMessageService : BackgroundService .AddStep(new MessageValidationStep(loggerFactory.CreateLogger())) .AddStep(_routingStep) .Build(); + _logger.LogInformation("消息处理管道构建完成"); + + // 订阅消息到达事件 + _connectionManager.OnMessageReceived += ProcessMessageAsync; + _logger.LogInformation("已订阅消息到达事件"); } + /// + /// 注册消息处理器 + /// public void RegisterMessageHandler(string messageType, Func> handler) { + _logger.LogInformation("注册消息处理器,消息类型:{MessageType}", messageType); _routingStep.RegisterHandler(messageType, handler); } + /// + /// 执行后台服务 + /// 负责处理出站消息的发送 + /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + _logger.LogInformation("WebSocket 消息服务开始运行"); try { - await foreach (var message in _connectionManager.ReadIncomingMessagesAsync(stoppingToken)) + // 处理出站消息 + await foreach (var message in _connectionManager.ReadOutgoingMessagesAsync(stoppingToken)) { - await ProcessMessageAsync(message, stoppingToken); + try + { + _logger.LogDebug("开始处理出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + message.ConnectionId, message.MessageType, message.Data.Length); + + var connection = _connectionManager.GetConnection(message.ConnectionId); + if (connection?.Socket.State == System.Net.WebSockets.WebSocketState.Open) + { + await connection.Socket.SendAsync( + new ArraySegment(message.Data), + message.MessageType, + true, + stoppingToken); + _logger.LogDebug("消息发送成功,连接ID:{ConnectionId}", message.ConnectionId); + } + else + { + _logger.LogWarning("消息发送失败,连接不存在或已关闭,连接ID:{ConnectionId}", message.ConnectionId); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "发送消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId); + } } } catch (OperationCanceledException) { - _logger.LogInformation("Message processing service is stopping"); + _logger.LogInformation("WebSocket 消息服务正在停止"); } catch (Exception ex) { - _logger.LogError(ex, "Error in message processing service"); + _logger.LogError(ex, "WebSocket 消息服务发生错误"); + } + finally + { + _logger.LogInformation("WebSocket 消息服务已停止"); } } - private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken) + /// + /// 处理消息 + /// + private async Task ProcessMessageAsync(WebSocketMessage message) { - await _processingSemaphore.WaitAsync(cancellationToken); + _logger.LogDebug("开始处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节", + message.ConnectionId, message.MessageType, message.Data.Length); + + await _processingSemaphore.WaitAsync(); try { - var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, cancellationToken); - await _connectionManager.QueueOutgoingMessage(processedMessage); + var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, CancellationToken.None); + _logger.LogDebug("消息处理完成,连接ID:{ConnectionId},处理结果:{Processed}", + message.ConnectionId, processedMessage != null); + + if (processedMessage != null) + { + await _connectionManager.QueueOutgoingMessage(processedMessage); + _logger.LogDebug("处理后的消息已入队,连接ID:{ConnectionId}", message.ConnectionId); + } } catch (Exception ex) { - _logger.LogError(ex, "Error processing message for connection {ConnectionId}", message.ConnectionId); + _logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId); await HandleProcessingError(message, ex); } finally { _processingSemaphore.Release(); + _logger.LogDebug("消息处理完成,释放信号量,连接ID:{ConnectionId}", message.ConnectionId); } } + /// + /// 处理错误 + /// private async Task HandleProcessingError(WebSocketMessage message, Exception exception) { + _logger.LogError(exception, "处理消息错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}", + message.ConnectionId, exception.Message); + try { var errorResponse = new WebSocketMessage @@ -93,16 +162,21 @@ public class WebSocketMessageService : BackgroundService }; await _connectionManager.QueueOutgoingMessage(errorResponse); + _logger.LogDebug("错误响应已发送,连接ID:{ConnectionId}", message.ConnectionId); } catch (Exception ex) { - _logger.LogError(ex, "Error sending error response for connection {ConnectionId}", message.ConnectionId); + _logger.LogError(ex, "发送错误响应时发生错误,连接ID:{ConnectionId}", message.ConnectionId); } } public override void Dispose() { + _logger.LogInformation("正在释放 WebSocket 消息服务资源"); + // 取消订阅事件 + _connectionManager.OnMessageReceived -= ProcessMessageAsync; _processingSemaphore.Dispose(); base.Dispose(); + _logger.LogInformation("WebSocket 消息服务资源已释放"); } } \ No newline at end of file