Browse Source

优化了IncomingMessageProcessor类的注释,使其更加清晰和准确

norm
hyh 2 months ago
parent
commit
b7982d530e
  1. 99
      src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs
  2. 24
      src/CellularManagement.WebSocket/Connection/ConnectionStatus.cs
  3. 36
      src/CellularManagement.WebSocket/Connection/IWebSocketConnectionManager.cs
  4. 35
      src/CellularManagement.WebSocket/Connection/IWebSocketMessageQueueManager.cs
  5. 16
      src/CellularManagement.WebSocket/Connection/WebSocketConnection.cs
  6. 99
      src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
  7. 137
      src/CellularManagement.WebSocket/Connection/WebSocketMessageQueueManager.cs
  8. 23
      src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs
  9. 2
      src/CellularManagement.WebSocket/Handlers/WebSocketMessageHandlerAdapter.cs
  10. 9
      src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
  11. 146
      src/CellularManagement.WebSocket/Services/ConnectionHealthCheckService.cs
  12. 33
      src/CellularManagement.WebSocket/Services/HandlerRegistrar.cs
  13. 307
      src/CellularManagement.WebSocket/Services/IncomingMessageProcessor.cs
  14. 159
      src/CellularManagement.WebSocket/Services/OutgoingMessageProcessor.cs
  15. 244
      src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

99
src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs

@ -0,0 +1,99 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// 连接管理协调器
/// 负责协调连接管理器和消息队列管理器的工作
/// </summary>
public class ConnectionManagerCoordinator
{
private readonly IWebSocketConnectionManager _connectionManager;
private readonly IWebSocketMessageQueueManager _messageQueueManager;
private readonly ILogger<ConnectionManagerCoordinator> _logger;
private readonly ConcurrentDictionary<string, bool> _processingConnections;
private readonly SemaphoreSlim _processingSemaphore;
private readonly int _maxConcurrentProcessing;
public ConnectionManagerCoordinator(
IWebSocketConnectionManager connectionManager,
IWebSocketMessageQueueManager messageQueueManager,
ILogger<ConnectionManagerCoordinator> logger,
int maxConcurrentProcessing = 100)
{
_connectionManager = connectionManager;
_messageQueueManager = messageQueueManager;
_logger = logger;
_maxConcurrentProcessing = maxConcurrentProcessing;
_processingConnections = new ConcurrentDictionary<string, bool>();
_processingSemaphore = new SemaphoreSlim(maxConcurrentProcessing);
_logger.LogInformation("初始化连接管理协调器,最大并发处理数:{MaxConcurrentProcessing}", maxConcurrentProcessing);
}
/// <summary>
/// 获取连接管理器
/// </summary>
public IWebSocketConnectionManager GetConnectionManager() => _connectionManager;
/// <summary>
/// 获取消息队列管理器
/// </summary>
public IWebSocketMessageQueueManager GetMessageQueueManager() => _messageQueueManager;
/// <summary>
/// 开始连接处理
/// </summary>
public async Task<bool> BeginConnectionProcessingAsync(string connectionId)
{
if (string.IsNullOrEmpty(connectionId))
{
throw new ArgumentException("连接ID不能为空", nameof(connectionId));
}
if (_processingConnections.TryGetValue(connectionId, out var isProcessing) && isProcessing)
{
_logger.LogWarning("连接正在处理中,连接ID:{ConnectionId}", connectionId);
return false;
}
await _processingSemaphore.WaitAsync();
_processingConnections[connectionId] = true;
_logger.LogDebug("开始处理连接,连接ID:{ConnectionId}", connectionId);
return true;
}
/// <summary>
/// 结束连接处理
/// </summary>
public async Task EndConnectionProcessingAsync(string connectionId)
{
if (string.IsNullOrEmpty(connectionId))
{
throw new ArgumentException("连接ID不能为空", nameof(connectionId));
}
if (_processingConnections.TryRemove(connectionId, out _))
{
_processingSemaphore.Release();
_logger.LogDebug("结束处理连接,连接ID:{ConnectionId}", connectionId);
}
}
/// <summary>
/// 检查连接是否正在处理
/// </summary>
public bool IsConnectionProcessing(string connectionId)
{
return _processingConnections.TryGetValue(connectionId, out var isProcessing) && isProcessing;
}
/// <summary>
/// 获取当前处理中的连接数
/// </summary>
public int GetProcessingConnectionCount()
{
return _processingConnections.Count;
}
}

24
src/CellularManagement.WebSocket/Connection/ConnectionStatus.cs

@ -1,11 +1,27 @@
namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// 连接状态枚举
/// WebSocket 连接状态
/// </summary>
public enum ConnectionStatus
{
Connected, // 已连接
Disconnected, // 已断开
Error // 错误状态
/// <summary>
/// 已连接
/// </summary>
Connected,
/// <summary>
/// 已断开
/// </summary>
Disconnected,
/// <summary>
/// 错误状态
/// </summary>
Error,
/// <summary>
/// 正在重连
/// </summary>
Reconnecting
}

36
src/CellularManagement.WebSocket/Connection/IWebSocketConnectionManager.cs

@ -0,0 +1,36 @@
using System.Net.WebSockets;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// WebSocket 连接管理器接口
/// 定义连接管理的基本操作
/// </summary>
public interface IWebSocketConnectionManager : IDisposable
{
/// <summary>
/// 添加新连接
/// </summary>
Task<string> AddConnectionAsync(System.Net.WebSockets.WebSocket socket);
/// <summary>
/// 移除连接
/// </summary>
Task<bool> RemoveConnectionAsync(string connectionId);
/// <summary>
/// 获取连接
/// </summary>
WebSocketConnection? GetConnection(string connectionId);
/// <summary>
/// 更新连接活动时间
/// </summary>
void UpdateConnectionActivity(string connectionId);
/// <summary>
/// 获取所有连接
/// </summary>
IEnumerable<WebSocketConnection> GetAllConnections();
}

35
src/CellularManagement.WebSocket/Connection/IWebSocketMessageQueueManager.cs

@ -0,0 +1,35 @@
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// WebSocket 消息队列管理器接口
/// 定义消息队列管理的基本操作
/// </summary>
public interface IWebSocketMessageQueueManager
{
/// <summary>
/// 消息到达事件
/// </summary>
event Func<WebSocketMessage, Task>? OnMessageReceived;
/// <summary>
/// 入队入站消息
/// </summary>
ValueTask QueueIncomingMessage(WebSocketMessage message);
/// <summary>
/// 入队出站消息
/// </summary>
ValueTask QueueOutgoingMessage(WebSocketMessage message);
/// <summary>
/// 读取入站消息
/// </summary>
IAsyncEnumerable<WebSocketMessage> ReadIncomingMessagesAsync(CancellationToken cancellationToken);
/// <summary>
/// 读取出站消息
/// </summary>
IAsyncEnumerable<WebSocketMessage> ReadOutgoingMessagesAsync(CancellationToken cancellationToken);
}

16
src/CellularManagement.WebSocket/Connection/WebSocketConnection.cs

@ -7,7 +7,23 @@ namespace CellularManagement.WebSocket.Connection;
/// </summary>
public class WebSocketConnection
{
/// <summary>
/// 连接ID
/// </summary>
public string Id { get; set; } = string.Empty;
/// <summary>
/// WebSocket 实例
/// </summary>
public System.Net.WebSockets.WebSocket Socket { get; set; } = null!;
/// <summary>
/// 最后活动时间
/// </summary>
public DateTime LastActivityTime { get; set; }
/// <summary>
/// 连接状态
/// </summary>
public ConnectionStatus Status { get; set; }
}

99
src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs

@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Extensions;
@ -12,48 +11,22 @@ namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// WebSocket 连接管理器
/// 负责管理所有 WebSocket 连接,处理消息的入队和出队
/// 负责管理所有 WebSocket 连接
/// </summary>
public class WebSocketConnectionManager : IDisposable
public class WebSocketConnectionManager : IWebSocketConnectionManager
{
private readonly ConcurrentDictionary<string, WebSocketConnection> _connections = new();
private readonly Channel<WebSocketMessage> _incomingMessages;
private readonly Channel<WebSocketMessage> _outgoingMessages;
private readonly ILogger<WebSocketConnectionManager> _logger;
private readonly Timer _heartbeatTimer;
private readonly WebSocketOptions _options;
private bool _disposed;
public event Func<WebSocketMessage, Task>? OnMessageReceived;
public WebSocketConnectionManager(
ILogger<WebSocketConnectionManager> logger,
IOptions<WebSocketOptions> options)
{
_logger = logger;
_options = options.Value;
_logger.LogInformation("初始化 WebSocket 连接管理器");
_incomingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(_options.MessageQueueSize)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
_outgoingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(_options.MessageQueueSize)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
_logger.LogInformation("创建消息队列完成,入站队列大小:{IncomingSize},出站队列大小:{OutgoingSize}",
_options.MessageQueueSize, _options.MessageQueueSize);
//_heartbeatTimer = new Timer(CheckConnections, null, _options.HeartbeatInterval, _options.HeartbeatInterval);
_logger.LogInformation("心跳检测定时器已启动,间隔:{Interval}秒", _options.HeartbeatInterval.TotalSeconds);
}
public async Task<string> AddConnectionAsync(System.Net.WebSockets.WebSocket socket)
@ -77,6 +50,7 @@ public class WebSocketConnectionManager : IDisposable
var connectionId = await GenerateUniqueConnectionIdAsync();
var connection = new WebSocketConnection
{
Id = connectionId,
Socket = socket,
LastActivityTime = DateTime.UtcNow,
Status = ConnectionStatus.Connected
@ -182,84 +156,17 @@ public class WebSocketConnectionManager : IDisposable
}
}
public async ValueTask QueueIncomingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
await _incomingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId);
if (OnMessageReceived != null)
{
_logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId);
await OnMessageReceived(message);
}
}
public async ValueTask QueueOutgoingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
await _outgoingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId);
}
public IAsyncEnumerable<WebSocketMessage> ReadIncomingMessagesAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("开始读取入站消息");
return _incomingMessages.Reader.ReadAllAsync(cancellationToken);
}
public IAsyncEnumerable<WebSocketMessage> ReadOutgoingMessagesAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("开始读取出站消息");
return _outgoingMessages.Reader.ReadAllAsync(cancellationToken);
}
public IEnumerable<WebSocketConnection> GetAllConnections()
{
_logger.LogDebug("获取所有连接,当前连接数:{ConnectionCount}", _connections.Count);
return _connections.Values;
}
private async void CheckConnections(object? state)
{
if (_disposed) return;
var now = DateTime.UtcNow;
var inactiveCount = 0;
foreach (var kvp in _connections)
{
var connection = kvp.Value;
if (now - connection.LastActivityTime > _options.ConnectionTimeout)
{
try
{
await RemoveConnectionAsync(kvp.Key);
inactiveCount++;
}
catch (Exception ex)
{
_logger.LogError(ex, "移除不活跃连接时发生错误,连接ID:{ConnectionId}", kvp.Key);
}
}
}
if (inactiveCount > 0)
{
_logger.LogInformation("清理了 {InactiveCount} 个不活跃连接", inactiveCount);
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_heartbeatTimer.Dispose();
_logger.LogInformation("WebSocket 连接管理器已释放");
}
}

137
src/CellularManagement.WebSocket/Connection/WebSocketMessageQueueManager.cs

@ -0,0 +1,137 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// WebSocket 消息队列管理器
/// 负责管理消息的入队和出队,是消息处理系统的核心组件
/// 主要功能:
/// 1. 管理入站消息队列(接收客户端消息)
/// 2. 管理出站消息队列(发送消息到客户端)
/// 3. 提供消息到达事件通知机制
/// 4. 支持异步消息读写操作
/// </summary>
public class WebSocketMessageQueueManager : IWebSocketMessageQueueManager
{
/// <summary>
/// 入站消息通道,用于存储从客户端接收到的消息
/// 使用有界通道,防止内存溢出
/// </summary>
private readonly Channel<WebSocketMessage> _incomingMessages;
/// <summary>
/// 出站消息通道,用于存储待发送到客户端的消息
/// 使用有界通道,防止内存溢出
/// </summary>
private readonly Channel<WebSocketMessage> _outgoingMessages;
/// <summary>
/// 日志记录器
/// </summary>
private readonly ILogger<WebSocketMessageQueueManager> _logger;
/// <summary>
/// WebSocket 配置选项
/// </summary>
private readonly WebSocketOptions _options;
/// <summary>
/// 消息到达事件
/// 当新消息入队时触发,用于通知消息处理器处理新消息
/// </summary>
public event Func<WebSocketMessage, Task>? OnMessageReceived;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器</param>
/// <param name="options">WebSocket 配置选项</param>
public WebSocketMessageQueueManager(
ILogger<WebSocketMessageQueueManager> logger,
IOptions<WebSocketOptions> options)
{
_logger = logger;
_options = options.Value;
// 创建入站消息通道
_incomingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(_options.MessageQueueSize)
{
FullMode = BoundedChannelFullMode.Wait, // 队列满时等待
SingleReader = false, // 允许多个读取器
SingleWriter = false // 允许多个写入器
});
// 创建出站消息通道
_outgoingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(_options.MessageQueueSize)
{
FullMode = BoundedChannelFullMode.Wait, // 队列满时等待
SingleReader = false, // 允许多个读取器
SingleWriter = false // 允许多个写入器
});
_logger.LogInformation("创建消息队列完成,入站队列大小:{IncomingSize},出站队列大小:{OutgoingSize}",
_options.MessageQueueSize, _options.MessageQueueSize);
}
/// <summary>
/// 入队入站消息
/// 将接收到的客户端消息放入入站消息队列,并触发消息到达事件
/// </summary>
/// <param name="message">WebSocket 消息</param>
public async ValueTask QueueIncomingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
// 将消息写入入站消息通道
await _incomingMessages.Writer.WriteAsync(message);
// 触发消息到达事件
if (OnMessageReceived != null)
{
_logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId);
await OnMessageReceived(message);
}
}
/// <summary>
/// 入队出站消息
/// 将待发送的消息放入出站消息队列
/// </summary>
/// <param name="message">WebSocket 消息</param>
public async ValueTask QueueOutgoingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
// 将消息写入出站消息通道
await _outgoingMessages.Writer.WriteAsync(message);
}
/// <summary>
/// 读取入站消息
/// 从入站消息队列中异步读取消息
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>入站消息的异步枚举器</returns>
public IAsyncEnumerable<WebSocketMessage> ReadIncomingMessagesAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("开始读取入站消息");
return _incomingMessages.Reader.ReadAllAsync(cancellationToken);
}
/// <summary>
/// 读取出站消息
/// 从出站消息队列中异步读取消息
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>出站消息的异步枚举器</returns>
public IAsyncEnumerable<WebSocketMessage> ReadOutgoingMessagesAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("开始读取出站消息");
return _outgoingMessages.Reader.ReadAllAsync(cancellationToken);
}
}

23
src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs

@ -11,16 +11,29 @@ public static class ServiceCollectionExtensions
{
public static IServiceCollection AddWebSocketServices(this IServiceCollection services, Action<CellularManagement.WebSocket.Models.WebSocketOptions>? configureOptions = null)
{
services.AddSingleton<WebSocketConnectionManager>();
// 注册 WebSocket 消息处理器为 Singleton
services.AddSingleton<IWebSocketMessageHandler, ChatMessageHandler>();
services.AddSingleton<IWebSocketMessageHandler, NotificationMessageHandler>();
services.AddHostedService<WebSocketMessageService>();
// 注册 WebSocket 选项
if (configureOptions != null)
{
services.Configure(configureOptions);
}
// 注册连接管理器
services.AddSingleton<IWebSocketConnectionManager, WebSocketConnectionManager>();
// 注册消息队列管理器
services.AddSingleton<IWebSocketMessageQueueManager, WebSocketMessageQueueManager>();
// 注册连接管理协调器
services.AddSingleton<ConnectionManagerCoordinator>();
// 注册 WebSocket 消息处理器
services.AddSingleton<IWebSocketMessageHandler, ChatMessageHandler>();
services.AddSingleton<IWebSocketMessageHandler, NotificationMessageHandler>();
// 注册后台服务
services.AddHostedService<WebSocketMessageService>();
services.AddHostedService<ConnectionHealthCheckService>();
return services;
}

2
src/CellularManagement.WebSocket/Handlers/WebSocketMessageHandlerAdapter.cs

@ -14,7 +14,7 @@ namespace CellularManagement.WebSocket.Handlers
public WebSocketMessageHandlerAdapter(
IWebSocketMessageHandler handler,
ILogger<WebSocketMessageService> logger)
ILogger<WebSocketMessageHandlerAdapter> logger)
: base(logger)
{
_handler = handler;

9
src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs

@ -13,17 +13,20 @@ namespace CellularManagement.WebSocket.Middleware;
public class WebSocketMiddleware
{
private readonly RequestDelegate _next;
private readonly WebSocketConnectionManager _connectionManager;
private readonly IWebSocketConnectionManager _connectionManager;
private readonly IWebSocketMessageQueueManager _messageQueueManager;
private readonly ILogger<WebSocketMiddleware> _logger;
private readonly int _bufferSize = 1024 * 4;
public WebSocketMiddleware(
RequestDelegate next,
WebSocketConnectionManager connectionManager,
IWebSocketConnectionManager connectionManager,
IWebSocketMessageQueueManager messageQueueManager,
ILogger<WebSocketMiddleware> logger)
{
_next = next;
_connectionManager = connectionManager;
_messageQueueManager = messageQueueManager;
_logger = logger;
_logger.LogInformation("初始化 WebSocket 中间件,缓冲区大小:{BufferSize}字节", _bufferSize);
}
@ -97,7 +100,7 @@ public class WebSocketMiddleware
_logger.LogDebug("准备处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
connectionId, message.MessageType, message.Data.Length);
await _connectionManager.QueueIncomingMessage(message);
await _messageQueueManager.QueueIncomingMessage(message);
_logger.LogDebug("消息已入队,连接ID:{ConnectionId}", connectionId);
receiveResult = await webSocket.ReceiveAsync(

146
src/CellularManagement.WebSocket/Services/ConnectionHealthCheckService.cs

@ -0,0 +1,146 @@
using CellularManagement.WebSocket.Connection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Services;
/// <summary>
/// 连接健康检查服务
/// 负责定期检查连接状态,清理不活跃的连接
/// </summary>
public class ConnectionHealthCheckService : BackgroundService
{
private readonly ConnectionManagerCoordinator _coordinator;
private readonly ILogger<ConnectionHealthCheckService> _logger;
private readonly WebSocketOptions _options;
private readonly TimeSpan _checkInterval;
private readonly TimeSpan _inactivityTimeout;
private bool _disposed;
public ConnectionHealthCheckService(
ConnectionManagerCoordinator coordinator,
ILogger<ConnectionHealthCheckService> logger,
IOptions<WebSocketOptions> options)
{
_coordinator = coordinator;
_logger = logger;
_options = options.Value;
_checkInterval = _options.HeartbeatInterval;
_inactivityTimeout = _options.ConnectionTimeout;
_logger.LogInformation("初始化连接健康检查服务,检查间隔:{CheckInterval}秒,超时时间:{Timeout}秒",
_checkInterval.TotalSeconds, _inactivityTimeout.TotalSeconds);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("连接健康检查服务开始运行");
while (!stoppingToken.IsCancellationRequested)
{
try
{
//await CheckConnectionsAsync();
await Task.Delay(_checkInterval, stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("连接健康检查服务正在停止");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "连接健康检查服务发生错误");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
_logger.LogInformation("连接健康检查服务已停止");
}
private async Task CheckConnectionsAsync()
{
var connectionManager = _coordinator.GetConnectionManager();
var connections = connectionManager.GetAllConnections();
var now = DateTime.UtcNow;
var inactiveConnections = new List<string>();
foreach (var connection in connections)
{
try
{
// 检查连接是否超时
if (now - connection.LastActivityTime > _inactivityTimeout)
{
_logger.LogWarning("连接超时,连接ID:{ConnectionId},最后活动时间:{LastActivityTime}",
connection.Id, connection.LastActivityTime);
inactiveConnections.Add(connection.Id);
continue;
}
// 检查连接是否正在处理
if (_coordinator.IsConnectionProcessing(connection.Id))
{
_logger.LogDebug("连接正在处理中,跳过检查,连接ID:{ConnectionId}", connection.Id);
continue;
}
// 检查连接状态
if (connection.Socket.State != System.Net.WebSockets.WebSocketState.Open)
{
_logger.LogWarning("连接状态异常,连接ID:{ConnectionId},状态:{State}",
connection.Id, connection.Socket.State);
inactiveConnections.Add(connection.Id);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "检查连接时发生错误,连接ID:{ConnectionId}", connection.Id);
inactiveConnections.Add(connection.Id);
}
}
// 清理不活跃的连接
foreach (var connectionId in inactiveConnections)
{
try
{
await connectionManager.RemoveConnectionAsync(connectionId);
_logger.LogInformation("已清理不活跃连接,连接ID:{ConnectionId}", connectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "清理连接时发生错误,连接ID:{ConnectionId}", connectionId);
}
}
_logger.LogInformation("连接健康检查完成,检查连接数:{TotalConnections},清理连接数:{CleanedConnections}",
connections.Count(), inactiveConnections.Count);
}
public override void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_logger.LogInformation("正在释放连接健康检查服务资源");
try
{
// 执行清理操作
_logger.LogInformation("连接健康检查服务资源已释放");
}
catch (Exception ex)
{
_logger.LogError(ex, "释放连接健康检查服务资源时发生错误");
}
finally
{
base.Dispose();
}
}
}

33
src/CellularManagement.WebSocket/Services/HandlerRegistrar.cs

@ -0,0 +1,33 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using CellularManagement.WebSocket.Handlers;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services;
/// <summary>
/// 负责注册 WebSocket 消息处理器到 HandlerManager
/// </summary>
public class HandlerRegistrar
{
private readonly HandlerManager _handlerManager;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<HandlerRegistrar> _logger;
public HandlerRegistrar(HandlerManager handlerManager, ILoggerFactory loggerFactory)
{
_handlerManager = handlerManager;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<HandlerRegistrar>();
}
public async Task RegisterHandlersAsync(IEnumerable<IWebSocketMessageHandler> messageHandlers)
{
foreach (var handler in messageHandlers)
{
_logger.LogInformation("注册消息处理器,消息类型:{MessageType}", handler.MessageType);
var adapter = new WebSocketMessageHandlerAdapter(handler, _loggerFactory.CreateLogger<WebSocketMessageHandlerAdapter>());
await _handlerManager.RegisterHandlerAsync(handler.MessageType, adapter);
}
}
}

307
src/CellularManagement.WebSocket/Services/IncomingMessageProcessor.cs

@ -0,0 +1,307 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using CellularManagement.WebSocket.Pipeline;
using CellularManagement.WebSocket.Pipeline.Steps;
using Microsoft.Extensions.Logging;
using System.Threading.Channels;
using CellularManagement.WebSocket.Handlers;
namespace CellularManagement.WebSocket.Services;
/// <summary>
/// 入站消息处理器
/// 负责处理从客户端接收到的消息,是消息处理系统的核心组件之一
///
/// 处理流程:
/// 1. 接收消息:通过消息队列管理器接收客户端消息
/// 2. 消息验证:验证消息格式和内容
/// 3. 消息路由:根据消息类型路由到对应的处理器
/// 4. 消息处理:执行具体的业务逻辑
/// 5. 响应处理:将处理结果发送回客户端
///
/// 特点:
/// - 支持并发处理:通过信号量控制并发数量
/// - 消息顺序保证:使用单一读取器确保消息顺序处理
/// - 错误处理:完整的错误处理和恢复机制
/// - 资源管理:自动释放资源,防止内存泄漏
/// </summary>
public class IncomingMessageProcessor : IDisposable
{
/// <summary>
/// 连接管理协调器
/// 用于协调连接状态和消息处理,确保连接状态的一致性
/// </summary>
private readonly ConnectionManagerCoordinator _coordinator;
/// <summary>
/// 消息队列管理器
/// 用于接收客户端消息和发送响应消息
/// </summary>
private readonly IWebSocketMessageQueueManager _messageQueueManager;
/// <summary>
/// 日志记录器
/// 用于记录处理过程中的关键信息和错误
/// </summary>
private readonly ILogger<IncomingMessageProcessor> _logger;
/// <summary>
/// 消息处理管道
/// 包含消息验证和路由等处理步骤,按顺序执行
/// </summary>
private readonly IPipelineStep<WebSocketMessage, WebSocketMessage> _pipeline;
/// <summary>
/// 处理并发控制信号量
/// 限制同时处理的消息数量,防止系统过载
/// </summary>
private readonly SemaphoreSlim _processingSemaphore;
/// <summary>
/// 最大并发处理数
/// 同时处理的最大消息数量,超过此数量将等待
/// </summary>
private readonly int _maxConcurrentProcesses;
/// <summary>
/// 消息通道
/// 用于存储待处理的消息,确保消息按顺序处理
/// 使用单一读取器模式,保证消息处理的顺序性
/// </summary>
private readonly Channel<WebSocketMessage> _messageChannel;
/// <summary>
/// 停止令牌源
/// 用于控制处理器的优雅停止
/// </summary>
private readonly CancellationTokenSource _stoppingCts;
/// <summary>
/// 消息处理器管理器
/// 管理和分发消息到对应的业务处理器
/// </summary>
private readonly HandlerManager _handlerManager;
/// <summary>
/// 日志工厂
/// 用于创建其他组件的日志记录器
/// </summary>
private readonly ILoggerFactory _loggerFactory;
/// <summary>
/// 资源释放标志
/// 防止重复释放资源
/// </summary>
private bool _disposed;
/// <summary>
/// 构造函数
/// 初始化入站消息处理器及其依赖组件
/// </summary>
/// <param name="coordinator">连接管理协调器,用于协调连接状态</param>
/// <param name="messageQueueManager">消息队列管理器,用于消息收发</param>
/// <param name="logger">日志记录器,用于记录处理过程</param>
/// <param name="loggerFactory">日志工厂,用于创建其他组件的日志记录器</param>
/// <param name="handlerManager">消息处理器管理器,用于管理和分发消息</param>
/// <param name="maxConcurrentProcesses">最大并发处理数,默认10</param>
public IncomingMessageProcessor(
ConnectionManagerCoordinator coordinator,
IWebSocketMessageQueueManager messageQueueManager,
ILogger<IncomingMessageProcessor> logger,
ILoggerFactory loggerFactory,
HandlerManager handlerManager,
int maxConcurrentProcesses = 10)
{
_coordinator = coordinator;
_messageQueueManager = messageQueueManager;
_logger = logger;
_loggerFactory = loggerFactory;
_maxConcurrentProcesses = maxConcurrentProcesses;
_processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses);
_messageChannel = Channel.CreateUnbounded<WebSocketMessage>(new UnboundedChannelOptions
{
SingleReader = true, // 单一读取器,确保消息顺序处理
SingleWriter = false // 允许多个写入器,支持并发写入
});
_stoppingCts = new CancellationTokenSource();
_handlerManager = handlerManager;
// 构建消息处理管道
var pipelineBuilder = new PipelineBuilder<WebSocketMessage, WebSocketMessage>(loggerFactory);
_pipeline = pipelineBuilder
.AddStep(new MessageValidationStep(loggerFactory.CreateLogger<MessageValidationStep>())) // 添加消息验证步骤
.AddStep(new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>(), _handlerManager)) // 添加消息路由步骤
.Build();
// 订阅消息到达事件
_messageQueueManager.OnMessageReceived += OnMessageReceived;
_logger.LogInformation("初始化入站消息处理器,最大并发处理数:{MaxConcurrentProcesses}", _maxConcurrentProcesses);
}
/// <summary>
/// 消息到达事件处理
/// 当新消息到达时,将其写入消息通道等待处理
/// </summary>
/// <param name="message">接收到的 WebSocket 消息</param>
/// <returns>处理任务</returns>
private Task OnMessageReceived(WebSocketMessage message)
{
try
{
if (!_messageChannel.Writer.TryWrite(message))
{
_logger.LogWarning("消息通道已满,无法处理新消息,连接ID:{ConnectionId}", message.ConnectionId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
return Task.CompletedTask;
}
/// <summary>
/// 处理入站消息
/// 从消息通道中读取消息并进行处理
/// </summary>
/// <param name="stoppingToken">停止令牌,用于控制处理器的停止</param>
/// <returns>处理任务</returns>
public async Task ProcessIncomingMessagesAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("入站消息处理服务开始运行");
try
{
await foreach (var message in _messageChannel.Reader.ReadAllAsync(stoppingToken))
{
await ProcessMessageAsync(message);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("入站消息处理服务正在停止");
}
catch (Exception ex)
{
_logger.LogError(ex, "入站消息处理服务发生错误");
}
finally
{
_logger.LogInformation("入站消息处理服务已停止");
}
}
/// <summary>
/// 处理单个消息
/// 包括消息验证、路由和处理
/// </summary>
/// <param name="message">待处理的 WebSocket 消息</param>
/// <returns>处理任务</returns>
private async Task ProcessMessageAsync(WebSocketMessage message)
{
_logger.LogDebug("开始处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
// 标记连接正在处理
if (!await _coordinator.BeginConnectionProcessingAsync(message.ConnectionId))
{
_logger.LogWarning("连接正在处理中,跳过消息处理,连接ID:{ConnectionId}", message.ConnectionId);
return;
}
await _processingSemaphore.WaitAsync();
try
{
// 通过管道处理消息
var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, _stoppingCts.Token);
_logger.LogDebug("消息处理完成,连接ID:{ConnectionId},处理结果:{Processed}",
message.ConnectionId, processedMessage != null);
if (processedMessage != null)
{
// 将处理后的消息入队
await _messageQueueManager.QueueOutgoingMessage(processedMessage);
_logger.LogDebug("处理后的消息已入队,连接ID:{ConnectionId}", message.ConnectionId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
await HandleProcessingError(message, ex);
}
finally
{
_processingSemaphore.Release();
await _coordinator.EndConnectionProcessingAsync(message.ConnectionId);
_logger.LogDebug("消息处理完成,释放信号量,连接ID:{ConnectionId}", message.ConnectionId);
}
}
/// <summary>
/// 处理错误
/// 当消息处理发生错误时,生成错误响应并发送给客户端
/// </summary>
/// <param name="message">原始消息</param>
/// <param name="exception">异常信息</param>
/// <returns>处理任务</returns>
private async Task HandleProcessingError(WebSocketMessage message, Exception exception)
{
_logger.LogError(exception, "处理消息错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
message.ConnectionId, exception.Message);
try
{
// 创建错误响应消息
var errorResponse = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "error",
error = exception.Message
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
// 将错误响应入队
await _messageQueueManager.QueueOutgoingMessage(errorResponse);
_logger.LogDebug("错误响应已发送,连接ID:{ConnectionId}", message.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "发送错误响应时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
}
/// <summary>
/// 释放资源
/// 释放所有占用的资源,包括信号量、取消令牌等
/// </summary>
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_logger.LogInformation("正在释放入站消息处理器资源");
try
{
// 取消订阅事件
_messageQueueManager.OnMessageReceived -= OnMessageReceived;
// 停止所有正在进行的操作
_stoppingCts.Cancel();
// 释放资源
_processingSemaphore.Dispose();
_stoppingCts.Dispose();
_logger.LogInformation("入站消息处理器资源已释放");
}
catch (Exception ex)
{
_logger.LogError(ex, "释放入站消息处理器资源时发生错误");
}
}
}

159
src/CellularManagement.WebSocket/Services/OutgoingMessageProcessor.cs

@ -9,22 +9,83 @@ namespace CellularManagement.WebSocket.Services;
/// <summary>
/// 出站消息处理器
/// 负责处理消息的发送、重试、超时控制和监控
/// 负责处理消息的发送、重试、超时控制和监控,是消息处理系统的核心组件之一
/// 主要功能:
/// 1. 消息发送和重试
/// 2. 并发控制和限流
/// 3. 性能监控和统计
/// 4. 错误处理和恢复
/// </summary>
public class OutgoingMessageProcessor : IDisposable
{
private readonly WebSocketConnectionManager _connectionManager;
/// <summary>
/// 连接管理器
/// 负责管理 WebSocket 连接
/// </summary>
private readonly IWebSocketConnectionManager _connectionManager;
/// <summary>
/// 消息队列管理器
/// 负责消息的入队和出队操作
/// </summary>
private readonly IWebSocketMessageQueueManager _messageQueueManager;
/// <summary>
/// 日志记录器
/// </summary>
private readonly ILogger<OutgoingMessageProcessor> _logger;
/// <summary>
/// 消息统计信息
/// 用于记录每个连接的消息处理情况
/// </summary>
private readonly ConcurrentDictionary<string, MessageStats> _messageStats;
/// <summary>
/// 背压控制信号量
/// 用于限制同时发送的消息数量
/// </summary>
private readonly SemaphoreSlim _backpressureSemaphore;
/// <summary>
/// 最大并发发送数
/// </summary>
private readonly int _maxConcurrentSends;
/// <summary>
/// 最大重试次数
/// </summary>
private readonly int _maxRetryAttempts;
/// <summary>
/// 发送超时时间
/// </summary>
private readonly TimeSpan _sendTimeout;
/// <summary>
/// 重试延迟时间
/// </summary>
private readonly TimeSpan _retryDelay;
/// <summary>
/// 停止令牌源
/// 用于控制处理器的停止
/// </summary>
private readonly CancellationTokenSource _stoppingCts;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="connectionManager">连接管理器</param>
/// <param name="messageQueueManager">消息队列管理器</param>
/// <param name="logger">日志记录器</param>
/// <param name="maxConcurrentSends">最大并发发送数,默认100</param>
/// <param name="maxRetryAttempts">最大重试次数,默认3</param>
/// <param name="sendTimeoutSeconds">发送超时时间(秒),默认30</param>
/// <param name="retryDelaySeconds">重试延迟时间(秒),默认5</param>
public OutgoingMessageProcessor(
WebSocketConnectionManager connectionManager,
IWebSocketConnectionManager connectionManager,
IWebSocketMessageQueueManager messageQueueManager,
ILogger<OutgoingMessageProcessor> logger,
int maxConcurrentSends = 100,
int maxRetryAttempts = 3,
@ -32,6 +93,7 @@ public class OutgoingMessageProcessor : IDisposable
int retryDelaySeconds = 5)
{
_connectionManager = connectionManager;
_messageQueueManager = messageQueueManager;
_logger = logger;
_maxConcurrentSends = maxConcurrentSends;
_maxRetryAttempts = maxRetryAttempts;
@ -44,12 +106,14 @@ public class OutgoingMessageProcessor : IDisposable
/// <summary>
/// 处理出站消息
/// 从消息队列中读取消息并发送
/// </summary>
/// <param name="stoppingToken">停止令牌</param>
public async Task ProcessOutgoingMessagesAsync(CancellationToken stoppingToken)
{
try
{
await foreach (var message in _connectionManager.ReadOutgoingMessagesAsync(stoppingToken))
await foreach (var message in _messageQueueManager.ReadOutgoingMessagesAsync(stoppingToken))
{
await ProcessMessageAsync(message, stoppingToken);
}
@ -60,18 +124,27 @@ public class OutgoingMessageProcessor : IDisposable
}
}
/// <summary>
/// 处理单个消息
/// 包括消息发送、重试和统计
/// </summary>
/// <param name="message">WebSocket 消息</param>
/// <param name="stoppingToken">停止令牌</param>
private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken stoppingToken)
{
await _backpressureSemaphore.WaitAsync(stoppingToken);
try
{
// 获取或创建消息统计信息
var stats = _messageStats.GetOrAdd(message.ConnectionId, _ => new MessageStats());
stats.TotalMessages++;
// 记录发送开始时间
var stopwatch = Stopwatch.StartNew();
var success = await SendMessageWithRetryAsync(message, stoppingToken);
stopwatch.Stop();
// 更新统计信息
if (success)
{
stats.SuccessfulMessages++;
@ -88,49 +161,41 @@ public class OutgoingMessageProcessor : IDisposable
}
}
/// <summary>
/// 发送消息并重试
/// 在发送失败时进行重试
/// </summary>
/// <param name="message">WebSocket 消息</param>
/// <param name="stoppingToken">停止令牌</param>
/// <returns>发送是否成功</returns>
private async Task<bool> SendMessageWithRetryAsync(WebSocketMessage message, CancellationToken stoppingToken)
{
for (int attempt = 1; attempt <= _maxRetryAttempts; attempt++)
var attempts = 0;
while (attempts < _maxRetryAttempts)
{
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, _stoppingCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
cts.CancelAfter(_sendTimeout);
var connection = _connectionManager.GetConnection(message.ConnectionId);
if (connection?.Socket.State != System.Net.WebSockets.WebSocketState.Open)
{
_logger.LogWarning("连接不存在或已关闭,连接ID:{ConnectionId}", message.ConnectionId);
return false;
}
await connection.Socket.SendAsync(
new ArraySegment<byte>(message.Data),
message.MessageType,
true,
cts.Token);
_logger.LogDebug("消息发送成功,连接ID:{ConnectionId},尝试次数:{Attempt}",
message.ConnectionId, attempt);
// 发送消息
await _connectionManager.SendMessageAsync(message.ConnectionId, message.Data, message.MessageType, cts.Token);
return true;
}
catch (OperationCanceledException)
{
_logger.LogWarning("消息发送超时,连接ID:{ConnectionId},尝试次数:{Attempt}",
message.ConnectionId, attempt);
}
catch (Exception ex)
{
_logger.LogError(ex, "发送消息时发生错误,连接ID:{ConnectionId},尝试次数:{Attempt}",
message.ConnectionId, attempt);
}
attempts++;
_logger.LogWarning(ex, "发送消息失败,连接ID:{ConnectionId},重试次数:{Attempts}/{MaxRetries}",
message.ConnectionId, attempts, _maxRetryAttempts);
if (attempt < _maxRetryAttempts)
{
await Task.Delay(_retryDelay, stoppingToken);
if (attempts < _maxRetryAttempts)
{
await Task.Delay(_retryDelay, stoppingToken);
}
}
}
_logger.LogError("发送消息失败,已达到最大重试次数,连接ID:{ConnectionId}", message.ConnectionId);
return false;
}
@ -142,6 +207,9 @@ public class OutgoingMessageProcessor : IDisposable
return _messageStats;
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
_stoppingCts.Cancel();
@ -150,3 +218,30 @@ public class OutgoingMessageProcessor : IDisposable
}
}
/// <summary>
/// 消息统计信息
/// 记录消息处理的统计数据
/// </summary>
public class MessageStats
{
/// <summary>
/// 总消息数
/// </summary>
public int TotalMessages { get; set; }
/// <summary>
/// 成功发送的消息数
/// </summary>
public int SuccessfulMessages { get; set; }
/// <summary>
/// 发送失败的消息数
/// </summary>
public int FailedMessages { get; set; }
/// <summary>
/// 平均发送时间(毫秒)
/// </summary>
public double AverageSendTime { get; set; }
}

244
src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

@ -12,104 +12,120 @@ namespace CellularManagement.WebSocket.Services;
/// <summary>
/// WebSocket 消息服务
/// 负责处理 WebSocket 消息的接收、处理和发送
/// 负责协调入站和出站消息的处理,是消息处理系统的核心组件
/// 主要功能:
/// 1. 协调入站和出站消息的处理
/// 2. 管理消息处理器的生命周期
/// 3. 提供消息处理的统一入口
/// 4. 处理服务的启动和停止
/// </summary>
public class WebSocketMessageService : BackgroundService
{
private readonly WebSocketConnectionManager _connectionManager;
/// <summary>
/// 连接管理协调器
/// 负责协调连接管理和消息处理
/// </summary>
private readonly ConnectionManagerCoordinator _coordinator;
/// <summary>
/// 消息队列管理器
/// 负责消息的入队和出队操作
/// </summary>
private readonly IWebSocketMessageQueueManager _messageQueueManager;
/// <summary>
/// 日志记录器
/// </summary>
private readonly ILogger<WebSocketMessageService> _logger;
private readonly IPipelineStep<WebSocketMessage, WebSocketMessage> _pipeline;
private readonly MessageRoutingStep _routingStep;
private readonly HandlerManager _handlerManager;
private readonly SemaphoreSlim _processingSemaphore;
private readonly int _maxConcurrentProcesses = 10;
private readonly IEnumerable<IWebSocketMessageHandler> _messageHandlers;
private readonly Channel<WebSocketMessage> _messageChannel;
private readonly CancellationTokenSource _stoppingCts;
/// <summary>
/// 入站消息处理器
/// 负责处理接收到的消息
/// </summary>
private readonly IncomingMessageProcessor _incomingMessageProcessor;
/// <summary>
/// 出站消息处理器
/// 负责处理待发送的消息
/// </summary>
private readonly OutgoingMessageProcessor _outgoingMessageProcessor;
/// <summary>
/// 停止任务完成源
/// 用于控制服务的停止
/// </summary>
private readonly TaskCompletionSource _stopTcs;
/// <summary>
/// 资源释放标志
/// </summary>
private bool _disposed;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="coordinator">连接管理协调器</param>
/// <param name="messageQueueManager">消息队列管理器</param>
/// <param name="logger">日志记录器</param>
/// <param name="loggerFactory">日志工厂</param>
/// <param name="messageHandlers">消息处理器集合</param>
public WebSocketMessageService(
WebSocketConnectionManager connectionManager,
ConnectionManagerCoordinator coordinator,
IWebSocketMessageQueueManager messageQueueManager,
ILogger<WebSocketMessageService> logger,
ILoggerFactory loggerFactory,
IEnumerable<IWebSocketMessageHandler> messageHandlers)
{
_connectionManager = connectionManager;
_coordinator = coordinator;
_messageQueueManager = messageQueueManager;
_logger = logger;
_messageHandlers = messageHandlers;
_stopTcs = new TaskCompletionSource();
_logger.LogInformation("初始化 WebSocket 消息服务");
_handlerManager = new HandlerManager(loggerFactory.CreateLogger<HandlerManager>());
_routingStep = new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>(), _handlerManager);
_processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses);
_messageChannel = Channel.CreateUnbounded<WebSocketMessage>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});
_stoppingCts = new CancellationTokenSource();
_outgoingMessageProcessor = new OutgoingMessageProcessor(
connectionManager,
loggerFactory.CreateLogger<OutgoingMessageProcessor>());
_logger.LogInformation("创建消息处理信号量,最大并发数:{MaxConcurrentProcesses}", _maxConcurrentProcesses);
// 构建处理管道
var pipelineBuilder = new PipelineBuilder<WebSocketMessage, WebSocketMessage>(loggerFactory);
_pipeline = pipelineBuilder
.AddStep(new MessageValidationStep(loggerFactory.CreateLogger<MessageValidationStep>()))
.AddStep(_routingStep)
.Build();
_logger.LogInformation("消息处理管道构建完成");
// 创建 HandlerManager
var handlerManager = new HandlerManager(loggerFactory.CreateLogger<HandlerManager>());
// 注册消息处理器
RegisterMessageHandlers();
// 订阅消息到达事件
_connectionManager.OnMessageReceived += OnMessageReceived;
_logger.LogInformation("已订阅消息到达事件");
}
private Task OnMessageReceived(WebSocketMessage message)
{
try
{
if (!_messageChannel.Writer.TryWrite(message))
{
_logger.LogWarning("消息通道已满,无法处理新消息,连接ID:{ConnectionId}", message.ConnectionId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
return Task.CompletedTask;
}
var handlerRegistrar = new HandlerRegistrar(handlerManager, loggerFactory);
_ = handlerRegistrar.RegisterHandlersAsync(messageHandlers);
// 创建入站消息处理器
_incomingMessageProcessor = new IncomingMessageProcessor(
coordinator,
messageQueueManager,
loggerFactory.CreateLogger<IncomingMessageProcessor>(),
loggerFactory,
handlerManager);
// 创建出站消息处理器
_outgoingMessageProcessor = new OutgoingMessageProcessor(
coordinator.GetConnectionManager(),
messageQueueManager,
loggerFactory.CreateLogger<OutgoingMessageProcessor>());
private async void RegisterMessageHandlers()
{
foreach (var handler in _messageHandlers)
{
_logger.LogInformation("注册消息处理器,消息类型:{MessageType}", handler.MessageType);
var adapter = new WebSocketMessageHandlerAdapter(handler, _logger);
await _handlerManager.RegisterHandlerAsync(handler.MessageType, adapter);
}
_logger.LogInformation("WebSocket 消息服务初始化完成");
}
/// <summary>
/// 执行后台服务
/// 负责处理消息的接收和处理
/// 负责协调入站和出站消息的处理
/// </summary>
/// <param name="stoppingToken">停止令牌</param>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("WebSocket 消息服务开始运行");
try
{
// 处理入站消息
await ProcessIncomingMessagesAsync(stoppingToken);
// 启动入站和出站消息处理任务
var incomingTask = _incomingMessageProcessor.ProcessIncomingMessagesAsync(stoppingToken);
var outgoingTask = _outgoingMessageProcessor.ProcessOutgoingMessagesAsync(stoppingToken);
// 处理出站消息
await _outgoingMessageProcessor.ProcessOutgoingMessagesAsync(stoppingToken);
// 注册停止令牌的回调
stoppingToken.Register(() => _stopTcs.TrySetResult());
// 等待停止信号
await _stopTcs.Task;
}
catch (OperationCanceledException)
{
@ -125,96 +141,32 @@ public class WebSocketMessageService : BackgroundService
}
}
private async Task ProcessIncomingMessagesAsync(CancellationToken stoppingToken)
/// <summary>
/// 释放资源
/// </summary>
public override void Dispose()
{
try
if (_disposed)
{
await foreach (var message in _messageChannel.Reader.ReadAllAsync(stoppingToken))
{
await ProcessMessageAsync(message);
}
return;
}
catch (OperationCanceledException)
{
_logger.LogInformation("入站消息处理已停止");
}
}
/// <summary>
/// 处理消息
/// </summary>
private async Task ProcessMessageAsync(WebSocketMessage message)
{
_logger.LogDebug("开始处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
_disposed = true;
_logger.LogInformation("正在释放 WebSocket 消息服务资源");
await _processingSemaphore.WaitAsync();
try
{
var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, _stoppingCts.Token);
_logger.LogDebug("消息处理完成,连接ID:{ConnectionId},处理结果:{Processed}",
message.ConnectionId, processedMessage != null);
if (processedMessage != null)
{
await _connectionManager.QueueOutgoingMessage(processedMessage);
_logger.LogDebug("处理后的消息已入队,连接ID:{ConnectionId}", message.ConnectionId);
}
_incomingMessageProcessor.Dispose();
_outgoingMessageProcessor.Dispose();
_logger.LogInformation("WebSocket 消息服务资源已释放");
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
await HandleProcessingError(message, ex);
_logger.LogError(ex, "释放 WebSocket 消息服务资源时发生错误");
}
finally
{
_processingSemaphore.Release();
_logger.LogDebug("消息处理完成,释放信号量,连接ID:{ConnectionId}", message.ConnectionId);
base.Dispose();
}
}
/// <summary>
/// 处理错误
/// </summary>
private async Task HandleProcessingError(WebSocketMessage message, Exception exception)
{
_logger.LogError(exception, "处理消息错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
message.ConnectionId, exception.Message);
try
{
var errorResponse = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "error",
error = exception.Message
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
await _connectionManager.QueueOutgoingMessage(errorResponse);
_logger.LogDebug("错误响应已发送,连接ID:{ConnectionId}", message.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "发送错误响应时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
}
public override void Dispose()
{
_logger.LogInformation("正在释放 WebSocket 消息服务资源");
// 取消订阅事件
_connectionManager.OnMessageReceived -= OnMessageReceived;
// 停止所有正在进行的操作
_stoppingCts.Cancel();
// 释放资源
_processingSemaphore.Dispose();
_stoppingCts.Dispose();
_outgoingMessageProcessor.Dispose();
base.Dispose();
_logger.LogInformation("WebSocket 消息服务资源已释放");
}
}
Loading…
Cancel
Save