Browse Source

修复 PipelineBuilder 中的类型转换和 Logger 问题

ws
hyh 3 months ago
parent
commit
da9345dbad
  1. 127
      src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
  2. 48
      src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
  3. 21
      src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs
  4. 121
      src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs
  5. 75
      src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs
  6. 92
      src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

127
src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs

@ -5,23 +5,37 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Connection; namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// WebSocket 连接管理器
/// 负责管理所有 WebSocket 连接,处理消息的入队和出队
/// </summary>
public class WebSocketConnectionManager : IDisposable public class WebSocketConnectionManager : IDisposable
{ {
// 存储所有活动的 WebSocket 连接
private readonly ConcurrentDictionary<string, WebSocketConnection> _connections = new(); private readonly ConcurrentDictionary<string, WebSocketConnection> _connections = new();
// 入站消息队列
private readonly Channel<WebSocketMessage> _incomingMessages; private readonly Channel<WebSocketMessage> _incomingMessages;
// 出站消息队列
private readonly Channel<WebSocketMessage> _outgoingMessages; private readonly Channel<WebSocketMessage> _outgoingMessages;
private readonly ILogger<WebSocketConnectionManager> _logger; private readonly ILogger<WebSocketConnectionManager> _logger;
// 心跳检测定时器
private readonly Timer _heartbeatTimer; private readonly Timer _heartbeatTimer;
private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(30); private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(30);
// 消息到达事件
public event Func<WebSocketMessage, Task>? OnMessageReceived;
public WebSocketConnectionManager(ILogger<WebSocketConnectionManager> logger) public WebSocketConnectionManager(ILogger<WebSocketConnectionManager> logger)
{ {
_logger = logger; _logger = logger;
_logger.LogInformation("初始化 WebSocket 连接管理器");
// 创建有界通道,限制最大消息数量为 10000
_incomingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000) _incomingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000)
{ {
FullMode = BoundedChannelFullMode.Wait, FullMode = BoundedChannelFullMode.Wait, // 当队列满时等待
SingleReader = false, SingleReader = false, // 允许多个读取者
SingleWriter = false SingleWriter = false // 允许多个写入者
}); });
_outgoingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000) _outgoingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000)
@ -31,9 +45,17 @@ public class WebSocketConnectionManager : IDisposable
SingleWriter = false SingleWriter = false
}); });
_logger.LogInformation("创建消息队列完成,入站队列大小:{IncomingSize},出站队列大小:{OutgoingSize}",
10000, 10000);
// 启动心跳检测定时器
_heartbeatTimer = new Timer(CheckConnections, null, _heartbeatInterval, _heartbeatInterval); _heartbeatTimer = new Timer(CheckConnections, null, _heartbeatInterval, _heartbeatInterval);
_logger.LogInformation("心跳检测定时器已启动,间隔:{Interval}秒", _heartbeatInterval.TotalSeconds);
} }
/// <summary>
/// 添加新的 WebSocket 连接
/// </summary>
public string AddConnection(System.Net.WebSockets.WebSocket socket) public string AddConnection(System.Net.WebSockets.WebSocket socket)
{ {
var connectionId = Guid.NewGuid().ToString(); var connectionId = Guid.NewGuid().ToString();
@ -45,91 +67,155 @@ public class WebSocketConnectionManager : IDisposable
}; };
_connections.TryAdd(connectionId, connection); _connections.TryAdd(connectionId, connection);
_logger.LogInformation("New connection added: {ConnectionId}", connectionId); _logger.LogInformation("添加新连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}",
connectionId, _connections.Count);
return connectionId; return connectionId;
} }
/// <summary>
/// 移除 WebSocket 连接
/// </summary>
public bool RemoveConnection(string connectionId) public bool RemoveConnection(string connectionId)
{ {
if (_connections.TryRemove(connectionId, out var connection)) if (_connections.TryRemove(connectionId, out var connection))
{ {
_logger.LogInformation("Connection removed: {ConnectionId}", connectionId); _logger.LogInformation("移除连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}",
connectionId, _connections.Count);
return true; return true;
} }
_logger.LogWarning("移除连接失败,连接ID:{ConnectionId} 不存在", connectionId);
return false; return false;
} }
/// <summary>
/// 获取指定连接
/// </summary>
public WebSocketConnection? GetConnection(string connectionId) public WebSocketConnection? GetConnection(string connectionId)
{ {
_connections.TryGetValue(connectionId, out var connection); _connections.TryGetValue(connectionId, out var connection);
if (connection == null)
{
_logger.LogWarning("获取连接失败,连接ID:{ConnectionId} 不存在", connectionId);
}
return connection; return connection;
} }
/// <summary>
/// 更新连接活动时间
/// </summary>
public void UpdateConnectionActivity(string connectionId) public void UpdateConnectionActivity(string connectionId)
{ {
if (_connections.TryGetValue(connectionId, out var connection)) if (_connections.TryGetValue(connectionId, out var connection))
{ {
connection.LastActivityTime = DateTime.UtcNow; connection.LastActivityTime = DateTime.UtcNow;
_logger.LogDebug("更新连接活动时间,连接ID:{ConnectionId}", connectionId);
} }
} }
/// <summary>
/// 入队入站消息
/// </summary>
public async ValueTask QueueIncomingMessage(WebSocketMessage message) public async ValueTask QueueIncomingMessage(WebSocketMessage message)
{ {
_logger.LogDebug("入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
await _incomingMessages.Writer.WriteAsync(message); await _incomingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId); UpdateConnectionActivity(message.ConnectionId);
// 触发消息到达事件
if (OnMessageReceived != null)
{
_logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId);
await OnMessageReceived(message);
}
} }
/// <summary>
/// 入队出站消息
/// </summary>
public async ValueTask QueueOutgoingMessage(WebSocketMessage message) public async ValueTask QueueOutgoingMessage(WebSocketMessage message)
{ {
_logger.LogDebug("入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
await _outgoingMessages.Writer.WriteAsync(message); await _outgoingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId); UpdateConnectionActivity(message.ConnectionId);
} }
/// <summary>
/// 读取入站消息
/// </summary>
public IAsyncEnumerable<WebSocketMessage> ReadIncomingMessagesAsync(CancellationToken cancellationToken) public IAsyncEnumerable<WebSocketMessage> ReadIncomingMessagesAsync(CancellationToken cancellationToken)
{ {
_logger.LogDebug("开始读取入站消息");
return _incomingMessages.Reader.ReadAllAsync(cancellationToken); return _incomingMessages.Reader.ReadAllAsync(cancellationToken);
} }
/// <summary>
/// 读取出站消息
/// </summary>
public IAsyncEnumerable<WebSocketMessage> ReadOutgoingMessagesAsync(CancellationToken cancellationToken) public IAsyncEnumerable<WebSocketMessage> ReadOutgoingMessagesAsync(CancellationToken cancellationToken)
{ {
_logger.LogDebug("开始读取出站消息");
return _outgoingMessages.Reader.ReadAllAsync(cancellationToken); return _outgoingMessages.Reader.ReadAllAsync(cancellationToken);
} }
/// <summary>
/// 获取所有连接
/// </summary>
public IEnumerable<WebSocketConnection> GetAllConnections() public IEnumerable<WebSocketConnection> GetAllConnections()
{ {
_logger.LogDebug("获取所有连接,当前连接数:{ConnectionCount}", _connections.Count);
return _connections.Values; return _connections.Values;
} }
/// <summary>
/// 检查连接状态
/// </summary>
private void CheckConnections(object? state) private void CheckConnections(object? state)
{ {
var now = DateTime.UtcNow; var now = DateTime.UtcNow;
var inactiveThreshold = TimeSpan.FromMinutes(1); var inactiveThreshold = TimeSpan.FromMinutes(1);
int inactiveCount = 0;
foreach (var (connectionId, connection) in _connections) foreach (var (connectionId, connection) in _connections)
{ {
if (connection.Status == ConnectionStatus.Connected && if (connection.Status == ConnectionStatus.Connected &&
now - connection.LastActivityTime > inactiveThreshold) now - connection.LastActivityTime > inactiveThreshold)
{ {
_logger.LogWarning("Connection {ConnectionId} is inactive, closing", connectionId); _logger.LogWarning("检测到不活跃连接,连接ID:{ConnectionId},最后活动时间:{LastActivityTime}",
connectionId, connection.LastActivityTime);
inactiveCount++;
_ = CloseConnectionAsync(connectionId); _ = CloseConnectionAsync(connectionId);
} }
} }
if (inactiveCount > 0)
{
_logger.LogInformation("心跳检测完成,发现 {InactiveCount} 个不活跃连接", inactiveCount);
}
} }
/// <summary>
/// 关闭连接
/// </summary>
private async Task CloseConnectionAsync(string connectionId) private async Task CloseConnectionAsync(string connectionId)
{ {
if (_connections.TryGetValue(connectionId, out var connection)) if (_connections.TryGetValue(connectionId, out var connection))
{ {
try try
{ {
_logger.LogInformation("正在关闭连接,连接ID:{ConnectionId}", connectionId);
await connection.Socket.CloseAsync( await connection.Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure, WebSocketCloseStatus.NormalClosure,
"Connection timeout", "Connection timeout",
CancellationToken.None); CancellationToken.None);
_logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error closing connection {ConnectionId}", connectionId); _logger.LogError(ex, "关闭连接时发生错误,连接ID:{ConnectionId}", connectionId);
} }
finally finally
{ {
@ -140,10 +226,15 @@ public class WebSocketConnectionManager : IDisposable
public void Dispose() public void Dispose()
{ {
_logger.LogInformation("正在释放 WebSocket 连接管理器资源");
_heartbeatTimer.Dispose(); _heartbeatTimer.Dispose();
_logger.LogInformation("WebSocket 连接管理器资源已释放");
} }
} }
/// <summary>
/// WebSocket 连接信息
/// </summary>
public class WebSocketConnection public class WebSocketConnection
{ {
public System.Net.WebSockets.WebSocket Socket { get; set; } = null!; public System.Net.WebSockets.WebSocket Socket { get; set; } = null!;
@ -151,18 +242,24 @@ public class WebSocketConnection
public ConnectionStatus Status { get; set; } public ConnectionStatus Status { get; set; }
} }
/// <summary>
/// 连接状态枚举
/// </summary>
public enum ConnectionStatus public enum ConnectionStatus
{ {
Connected, Connected, // 已连接
Disconnected, Disconnected, // 已断开
Error Error // 错误状态
} }
/// <summary>
/// WebSocket 消息记录
/// </summary>
public record WebSocketMessage public record WebSocketMessage
{ {
public string ConnectionId { get; init; } = string.Empty; public string ConnectionId { get; init; } = string.Empty; // 连接ID
public byte[] Data { get; init; } = Array.Empty<byte>(); public byte[] Data { get; init; } = Array.Empty<byte>(); // 消息数据
public WebSocketMessageType MessageType { get; init; } public WebSocketMessageType MessageType { get; init; } // 消息类型
public DateTime Timestamp { get; init; } = DateTime.UtcNow; public DateTime Timestamp { get; init; } = DateTime.UtcNow; // 时间戳
public int Priority { get; init; } = 0; public int Priority { get; init; } = 0; // 优先级
} }

48
src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs

@ -5,6 +5,10 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Middleware; namespace CellularManagement.WebSocket.Middleware;
/// <summary>
/// WebSocket 中间件
/// 负责处理 WebSocket 连接的建立和消息的接收
/// </summary>
public class WebSocketMiddleware public class WebSocketMiddleware
{ {
private readonly RequestDelegate _next; private readonly RequestDelegate _next;
@ -20,14 +24,21 @@ public class WebSocketMiddleware
_next = next; _next = next;
_connectionManager = connectionManager; _connectionManager = connectionManager;
_logger = logger; _logger = logger;
_logger.LogInformation("初始化 WebSocket 中间件,缓冲区大小:{BufferSize}字节", _bufferSize);
} }
/// <summary>
/// 处理 HTTP 请求
/// </summary>
public async Task InvokeAsync(HttpContext context) public async Task InvokeAsync(HttpContext context)
{ {
if (context.WebSockets.IsWebSocketRequest) if (context.WebSockets.IsWebSocketRequest)
{ {
_logger.LogInformation("收到 WebSocket 连接请求,路径:{Path}", context.Request.Path);
var webSocket = await context.WebSockets.AcceptWebSocketAsync(); var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = _connectionManager.AddConnection(webSocket); var connectionId = _connectionManager.AddConnection(webSocket);
_logger.LogInformation("WebSocket 连接已接受,连接ID:{ConnectionId}", connectionId);
try try
{ {
@ -35,30 +46,41 @@ public class WebSocketMiddleware
} }
catch (WebSocketException ex) catch (WebSocketException ex)
{ {
_logger.LogError(ex, "WebSocket error for connection {ConnectionId}", connectionId); _logger.LogError(ex, "WebSocket 连接发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
connectionId, ex.Message);
await HandleWebSocketError(webSocket, ex); await HandleWebSocketError(webSocket, ex);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error handling WebSocket connection {ConnectionId}", connectionId); _logger.LogError(ex, "处理 WebSocket 连接时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
connectionId, ex.Message);
await HandleWebSocketError(webSocket, ex); await HandleWebSocketError(webSocket, ex);
} }
finally finally
{ {
_connectionManager.RemoveConnection(connectionId); _connectionManager.RemoveConnection(connectionId);
_logger.LogInformation("WebSocket 连接已关闭,连接ID:{ConnectionId}", connectionId);
} }
} }
else else
{ {
_logger.LogDebug("非 WebSocket 请求,继续处理下一个中间件,路径:{Path}", context.Request.Path);
await _next(context); await _next(context);
} }
} }
/// <summary>
/// 处理 WebSocket 连接
/// </summary>
private async Task HandleWebSocketConnection(System.Net.WebSockets.WebSocket webSocket, string connectionId) private async Task HandleWebSocketConnection(System.Net.WebSockets.WebSocket webSocket, string connectionId)
{ {
_logger.LogInformation("开始处理 WebSocket 连接,连接ID:{ConnectionId}", connectionId);
var buffer = new byte[_bufferSize]; var buffer = new byte[_bufferSize];
var receiveResult = await webSocket.ReceiveAsync( var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None); new ArraySegment<byte>(buffer), CancellationToken.None);
_logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
connectionId, receiveResult.MessageType, receiveResult.Count);
while (!receiveResult.CloseStatus.HasValue) while (!receiveResult.CloseStatus.HasValue)
{ {
@ -71,39 +93,57 @@ public class WebSocketMiddleware
MessageType = receiveResult.MessageType MessageType = receiveResult.MessageType
}; };
_logger.LogDebug("准备处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
connectionId, message.MessageType, message.Data.Length);
await _connectionManager.QueueIncomingMessage(message); await _connectionManager.QueueIncomingMessage(message);
_logger.LogDebug("消息已入队,连接ID:{ConnectionId}", connectionId);
receiveResult = await webSocket.ReceiveAsync( receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None); new ArraySegment<byte>(buffer), CancellationToken.None);
_logger.LogDebug("收到新消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
connectionId, receiveResult.MessageType, receiveResult.Count);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error processing message for connection {ConnectionId}", connectionId); _logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
connectionId, ex.Message);
throw; throw;
} }
} }
_logger.LogInformation("收到关闭请求,连接ID:{ConnectionId},关闭状态:{CloseStatus},描述:{CloseDescription}",
connectionId, receiveResult.CloseStatus, receiveResult.CloseStatusDescription);
await webSocket.CloseAsync( await webSocket.CloseAsync(
receiveResult.CloseStatus.Value, receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription, receiveResult.CloseStatusDescription,
CancellationToken.None); CancellationToken.None);
_logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId);
} }
/// <summary>
/// 处理 WebSocket 错误
/// </summary>
private async Task HandleWebSocketError(System.Net.WebSockets.WebSocket webSocket, Exception exception) private async Task HandleWebSocketError(System.Net.WebSockets.WebSocket webSocket, Exception exception)
{ {
_logger.LogError(exception, "处理 WebSocket 错误,错误信息:{ErrorMessage}", exception.Message);
try try
{ {
if (webSocket.State == WebSocketState.Open) if (webSocket.State == WebSocketState.Open)
{ {
_logger.LogInformation("正在关闭出错的 WebSocket 连接");
await webSocket.CloseAsync( await webSocket.CloseAsync(
WebSocketCloseStatus.InternalServerError, WebSocketCloseStatus.InternalServerError,
"Internal server error", "Internal server error",
CancellationToken.None); CancellationToken.None);
_logger.LogInformation("WebSocket 连接已关闭");
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error closing WebSocket after error"); _logger.LogError(ex, "关闭 WebSocket 连接时发生错误,错误信息:{ErrorMessage}", ex.Message);
} }
} }
} }

21
src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs

@ -1,8 +1,29 @@
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline; namespace CellularManagement.WebSocket.Pipeline;
/// <summary>
/// 管道步骤接口
/// 定义消息处理管道中的单个处理步骤
/// </summary>
/// <typeparam name="TInput">输入消息类型</typeparam>
/// <typeparam name="TOutput">输出消息类型</typeparam>
public interface IPipelineStep<TInput, TOutput> public interface IPipelineStep<TInput, TOutput>
{ {
/// <summary>
/// 处理消息
/// </summary>
/// <param name="input">输入消息</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>处理后的消息</returns>
Task<TOutput> ProcessAsync(TInput input, CancellationToken cancellationToken); Task<TOutput> ProcessAsync(TInput input, CancellationToken cancellationToken);
/// <summary>
/// 处理消息(带错误处理)
/// </summary>
/// <param name="input">输入消息</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>处理后的消息</returns>
Task<TOutput> ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken); Task<TOutput> ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken);
} }

121
src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs

@ -3,33 +3,117 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline; namespace CellularManagement.WebSocket.Pipeline;
/// <summary>
/// 管道构建器
/// 用于构建消息处理管道
/// </summary>
/// <typeparam name="TInput">输入消息类型</typeparam>
/// <typeparam name="TOutput">输出消息类型</typeparam>
public class PipelineBuilder<TInput, TOutput> public class PipelineBuilder<TInput, TOutput>
{ {
private readonly List<IPipelineStep<TInput, TOutput>> _steps = new();
private readonly ILoggerFactory _loggerFactory; private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<PipelineBuilder<TInput, TOutput>> _logger;
private readonly List<IPipelineStep<TInput, TOutput>> _steps = new();
public PipelineBuilder(ILoggerFactory loggerFactory) public PipelineBuilder(ILoggerFactory loggerFactory)
{ {
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<PipelineBuilder<TInput, TOutput>>();
_logger.LogInformation("初始化管道构建器,输入类型:{InputType},输出类型:{OutputType}",
typeof(TInput).Name, typeof(TOutput).Name);
} }
/// <summary>
/// 添加处理步骤
/// </summary>
/// <param name="step">处理步骤</param>
/// <returns>管道构建器</returns>
public PipelineBuilder<TInput, TOutput> AddStep(IPipelineStep<TInput, TOutput> step) public PipelineBuilder<TInput, TOutput> AddStep(IPipelineStep<TInput, TOutput> step)
{ {
if (step == null) _logger.LogInformation("添加处理步骤,步骤类型:{StepType}", step.GetType().Name);
{
throw new ArgumentNullException(nameof(step));
}
_steps.Add(step); _steps.Add(step);
return this; return this;
} }
/// <summary>
/// 构建处理管道
/// </summary>
/// <returns>处理管道</returns>
public IPipelineStep<TInput, TOutput> Build() public IPipelineStep<TInput, TOutput> Build()
{ {
_logger.LogInformation("开始构建处理管道,步骤数量:{StepCount}", _steps.Count);
if (_steps.Count == 0) if (_steps.Count == 0)
{ {
throw new InvalidOperationException("Pipeline must have at least one step"); _logger.LogWarning("处理管道没有添加任何步骤");
throw new InvalidOperationException("处理管道必须至少包含一个步骤");
}
// 构建处理链
var pipeline = _steps[0];
for (int i = 1; i < _steps.Count; i++)
{
var currentStep = _steps[i];
_logger.LogDebug("连接处理步骤:{CurrentStep} -> {NextStep}",
pipeline.GetType().Name, currentStep.GetType().Name);
pipeline = new ChainedPipelineStep(pipeline, currentStep, _loggerFactory);
}
_logger.LogInformation("处理管道构建完成,总步骤数:{StepCount}", _steps.Count);
return pipeline;
}
/// <summary>
/// 链式处理步骤
/// 用于连接多个处理步骤
/// </summary>
private class ChainedPipelineStep : IPipelineStep<TInput, TOutput>
{
private readonly IPipelineStep<TInput, TOutput> _first;
private readonly IPipelineStep<TInput, TOutput> _second;
private readonly ILogger<ChainedPipelineStep> _logger;
public ChainedPipelineStep(
IPipelineStep<TInput, TOutput> first,
IPipelineStep<TInput, TOutput> second,
ILoggerFactory loggerFactory)
{
_first = first;
_second = second;
_logger = loggerFactory.CreateLogger<ChainedPipelineStep>();
_logger.LogDebug("创建链式处理步骤,第一步:{FirstStep},第二步:{SecondStep}",
first.GetType().Name, second.GetType().Name);
}
public async Task<TOutput> ProcessAsync(TInput input, CancellationToken cancellationToken)
{
_logger.LogDebug("开始链式处理消息");
var intermediate = await _first.ProcessAsync(input, cancellationToken);
if (intermediate is TInput inputValue)
{
return await _second.ProcessAsync(inputValue, cancellationToken);
}
throw new InvalidOperationException($"无法将类型 {intermediate?.GetType().Name ?? "null"} 转换为 {typeof(TInput).Name}");
}
public async Task<TOutput> ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken)
{
_logger.LogDebug("开始链式处理消息(带错误处理)");
try
{
var intermediate = await _first.ProcessWithErrorHandlingAsync(input, cancellationToken);
if (intermediate is TInput inputValue)
{
return await _second.ProcessWithErrorHandlingAsync(inputValue, cancellationToken);
}
throw new InvalidOperationException($"无法将类型 {intermediate?.GetType().Name ?? "null"} 转换为 {typeof(TInput).Name}");
}
catch (Exception ex)
{
_logger.LogError(ex, "链式处理发生错误");
throw;
}
} }
return new Pipeline<TInput, TOutput>(_steps, _loggerFactory.CreateLogger<Pipeline<TInput, TOutput>>());
} }
} }
@ -40,22 +124,29 @@ public class Pipeline<TInput, TOutput> : IPipelineStep<TInput, TOutput>
public Pipeline(IReadOnlyList<IPipelineStep<TInput, TOutput>> steps, ILogger<Pipeline<TInput, TOutput>> logger) public Pipeline(IReadOnlyList<IPipelineStep<TInput, TOutput>> steps, ILogger<Pipeline<TInput, TOutput>> logger)
{ {
_steps = steps; _steps = steps ?? throw new ArgumentNullException(nameof(steps));
_logger = logger; _logger = logger ?? throw new ArgumentNullException(nameof(logger));
} }
public async Task<TOutput> ProcessAsync(TInput input, CancellationToken cancellationToken) public async Task<TOutput> ProcessAsync(TInput input, CancellationToken cancellationToken)
{ {
TOutput? result = default;
var current = input; var current = input;
TOutput output = default!;
foreach (var step in _steps) foreach (var step in _steps)
{ {
output = await step.ProcessAsync(current, cancellationToken); result = await step.ProcessAsync(current, cancellationToken);
current = (TInput)(object)output!; if (result is TInput nextInput)
{
current = nextInput;
}
else
{
break;
}
} }
return output; return result ?? throw new InvalidOperationException("管道处理结果不能为 null");
} }
public async Task<TOutput> ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken) public async Task<TOutput> ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken)
@ -66,8 +157,8 @@ public class Pipeline<TInput, TOutput> : IPipelineStep<TInput, TOutput>
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error in pipeline processing"); _logger.LogError(ex, "管道处理发生错误");
throw new PipelineException("Pipeline processing failed", ex); throw new PipelineException("管道处理失败", ex);
} }
} }
} }

75
src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs

@ -4,6 +4,10 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline.Steps; namespace CellularManagement.WebSocket.Pipeline.Steps;
/// <summary>
/// 消息路由步骤
/// 负责根据消息类型将消息路由到相应的处理器
/// </summary>
public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessage> public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessage>
{ {
private readonly ILogger<MessageRoutingStep> _logger; private readonly ILogger<MessageRoutingStep> _logger;
@ -19,19 +23,35 @@ public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessa
{ {
PropertyNameCaseInsensitive = true PropertyNameCaseInsensitive = true
}; };
_logger.LogInformation("初始化消息路由步骤");
} }
/// <summary>
/// 注册消息处理器
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="handler">消息处理器</param>
public void RegisterHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler) public void RegisterHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler)
{ {
_logger.LogInformation("注册消息处理器,消息类型:{MessageType},处理器类型:{HandlerType}",
messageType, handler.Method.DeclaringType?.Name);
if (!_handlers.ContainsKey(messageType)) if (!_handlers.ContainsKey(messageType))
{ {
_handlers[messageType] = new List<Func<WebSocketMessage, Task<WebSocketMessage>>>(); _handlers[messageType] = new List<Func<WebSocketMessage, Task<WebSocketMessage>>>();
} }
_handlers[messageType].Add(handler); _handlers[messageType].Add(handler);
_logger.LogDebug("消息处理器注册完成,当前处理器数量:{HandlerCount}", _handlers[messageType].Count);
} }
/// <summary>
/// 处理消息
/// </summary>
public async Task<WebSocketMessage> ProcessAsync(WebSocketMessage input, CancellationToken cancellationToken) public async Task<WebSocketMessage> ProcessAsync(WebSocketMessage input, CancellationToken cancellationToken)
{ {
_logger.LogDebug("开始路由消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
input.ConnectionId, input.MessageType, input.Data.Length);
try try
{ {
var messageText = System.Text.Encoding.UTF8.GetString(input.Data); var messageText = System.Text.Encoding.UTF8.GetString(input.Data);
@ -39,48 +59,63 @@ public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessa
if (message == null || !message.TryGetValue("type", out var typeObj)) if (message == null || !message.TryGetValue("type", out var typeObj))
{ {
throw new InvalidOperationException("Message type not found"); _logger.LogWarning("消息缺少类型字段,连接ID:{ConnectionId}", input.ConnectionId);
throw new InvalidOperationException("消息类型未找到");
} }
var messageType = typeObj.ToString(); var messageType = typeObj.ToString();
if (string.IsNullOrEmpty(messageType)) if (string.IsNullOrEmpty(messageType))
{ {
throw new InvalidOperationException("Invalid message type"); _logger.LogWarning("消息类型为空,连接ID:{ConnectionId}", input.ConnectionId);
throw new InvalidOperationException("无效的消息类型");
} }
if (!_handlers.TryGetValue(messageType, out var handlers) || handlers.Count == 0) if (!_handlers.ContainsKey(messageType))
{ {
throw new InvalidOperationException($"No handler registered for message type: {messageType}"); _logger.LogWarning("未找到消息类型的处理器,消息类型:{MessageType},连接ID:{ConnectionId}",
messageType, input.ConnectionId);
throw new InvalidOperationException($"未找到消息类型 {messageType} 的处理器");
} }
// 使用轮询策略选择处理器 // 轮询选择处理器
var handler = GetNextHandler(handlers); var handlers = _handlers[messageType];
return await handler(input); var handler = handlers[_currentHandlerIndex % handlers.Count];
_currentHandlerIndex = (_currentHandlerIndex + 1) % handlers.Count;
_logger.LogDebug("选择消息处理器,消息类型:{MessageType},处理器索引:{HandlerIndex},连接ID:{ConnectionId}",
messageType, _currentHandlerIndex, input.ConnectionId);
var result = await handler(input);
_logger.LogDebug("消息处理完成,连接ID:{ConnectionId},消息类型:{MessageType}",
input.ConnectionId, messageType);
return result;
} }
catch (Exception ex) catch (JsonException ex)
{ {
_logger.LogError(ex, "Message routing failed for connection {ConnectionId}", input.ConnectionId); _logger.LogError(ex, "消息JSON解析失败,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
throw; input.ConnectionId, ex.Message);
throw new InvalidOperationException("消息格式无效:JSON解析失败", ex);
} }
} }
/// <summary>
/// 处理消息(带错误处理)
/// </summary>
public async Task<WebSocketMessage> ProcessWithErrorHandlingAsync(WebSocketMessage input, CancellationToken cancellationToken) public async Task<WebSocketMessage> ProcessWithErrorHandlingAsync(WebSocketMessage input, CancellationToken cancellationToken)
{ {
_logger.LogDebug("开始路由消息(带错误处理),连接ID:{ConnectionId}", input.ConnectionId);
try try
{ {
return await ProcessAsync(input, cancellationToken); var result = await ProcessAsync(input, cancellationToken);
_logger.LogDebug("消息路由完成(带错误处理),连接ID:{ConnectionId}", input.ConnectionId);
return result;
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error in message routing for connection {ConnectionId}", input.ConnectionId); _logger.LogError(ex, "消息路由失败,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
throw new PipelineException("Message routing failed", ex); input.ConnectionId, ex.Message);
} throw;
} }
private Func<WebSocketMessage, Task<WebSocketMessage>> GetNextHandler(List<Func<WebSocketMessage, Task<WebSocketMessage>>> handlers)
{
// 使用轮询策略选择处理器
var index = Interlocked.Increment(ref _currentHandlerIndex) % handlers.Count;
return handlers[index];
} }
} }

92
src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

@ -7,6 +7,10 @@ using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services; namespace CellularManagement.WebSocket.Services;
/// <summary>
/// WebSocket 消息服务
/// 负责处理 WebSocket 消息的接收、处理和发送
/// </summary>
public class WebSocketMessageService : BackgroundService public class WebSocketMessageService : BackgroundService
{ {
private readonly WebSocketConnectionManager _connectionManager; private readonly WebSocketConnectionManager _connectionManager;
@ -23,8 +27,11 @@ public class WebSocketMessageService : BackgroundService
{ {
_connectionManager = connectionManager; _connectionManager = connectionManager;
_logger = logger; _logger = logger;
_logger.LogInformation("初始化 WebSocket 消息服务");
_routingStep = new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>()); _routingStep = new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>());
_processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses); _processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses);
_logger.LogInformation("创建消息处理信号量,最大并发数:{MaxConcurrentProcesses}", _maxConcurrentProcesses);
// 构建处理管道 // 构建处理管道
var pipelineBuilder = new PipelineBuilder<WebSocketMessage, WebSocketMessage>(loggerFactory); var pipelineBuilder = new PipelineBuilder<WebSocketMessage, WebSocketMessage>(loggerFactory);
@ -32,53 +39,115 @@ public class WebSocketMessageService : BackgroundService
.AddStep(new MessageValidationStep(loggerFactory.CreateLogger<MessageValidationStep>())) .AddStep(new MessageValidationStep(loggerFactory.CreateLogger<MessageValidationStep>()))
.AddStep(_routingStep) .AddStep(_routingStep)
.Build(); .Build();
_logger.LogInformation("消息处理管道构建完成");
// 订阅消息到达事件
_connectionManager.OnMessageReceived += ProcessMessageAsync;
_logger.LogInformation("已订阅消息到达事件");
} }
/// <summary>
/// 注册消息处理器
/// </summary>
public void RegisterMessageHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler) public void RegisterMessageHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler)
{ {
_logger.LogInformation("注册消息处理器,消息类型:{MessageType}", messageType);
_routingStep.RegisterHandler(messageType, handler); _routingStep.RegisterHandler(messageType, handler);
} }
/// <summary>
/// 执行后台服务
/// 负责处理出站消息的发送
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("WebSocket 消息服务开始运行");
try
{
// 处理出站消息
await foreach (var message in _connectionManager.ReadOutgoingMessagesAsync(stoppingToken))
{ {
try try
{ {
await foreach (var message in _connectionManager.ReadIncomingMessagesAsync(stoppingToken)) _logger.LogDebug("开始处理出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
var connection = _connectionManager.GetConnection(message.ConnectionId);
if (connection?.Socket.State == System.Net.WebSockets.WebSocketState.Open)
{
await connection.Socket.SendAsync(
new ArraySegment<byte>(message.Data),
message.MessageType,
true,
stoppingToken);
_logger.LogDebug("消息发送成功,连接ID:{ConnectionId}", message.ConnectionId);
}
else
{
_logger.LogWarning("消息发送失败,连接不存在或已关闭,连接ID:{ConnectionId}", message.ConnectionId);
}
}
catch (Exception ex)
{ {
await ProcessMessageAsync(message, stoppingToken); _logger.LogError(ex, "发送消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
_logger.LogInformation("Message processing service is stopping"); _logger.LogInformation("WebSocket 消息服务正在停止");
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error in message processing service"); _logger.LogError(ex, "WebSocket 消息服务发生错误");
}
finally
{
_logger.LogInformation("WebSocket 消息服务已停止");
} }
} }
private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken) /// <summary>
/// 处理消息
/// </summary>
private async Task ProcessMessageAsync(WebSocketMessage message)
{ {
await _processingSemaphore.WaitAsync(cancellationToken); _logger.LogDebug("开始处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
await _processingSemaphore.WaitAsync();
try try
{ {
var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, cancellationToken); var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, CancellationToken.None);
_logger.LogDebug("消息处理完成,连接ID:{ConnectionId},处理结果:{Processed}",
message.ConnectionId, processedMessage != null);
if (processedMessage != null)
{
await _connectionManager.QueueOutgoingMessage(processedMessage); await _connectionManager.QueueOutgoingMessage(processedMessage);
_logger.LogDebug("处理后的消息已入队,连接ID:{ConnectionId}", message.ConnectionId);
}
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error processing message for connection {ConnectionId}", message.ConnectionId); _logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
await HandleProcessingError(message, ex); await HandleProcessingError(message, ex);
} }
finally finally
{ {
_processingSemaphore.Release(); _processingSemaphore.Release();
_logger.LogDebug("消息处理完成,释放信号量,连接ID:{ConnectionId}", message.ConnectionId);
} }
} }
/// <summary>
/// 处理错误
/// </summary>
private async Task HandleProcessingError(WebSocketMessage message, Exception exception) private async Task HandleProcessingError(WebSocketMessage message, Exception exception)
{ {
_logger.LogError(exception, "处理消息错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
message.ConnectionId, exception.Message);
try try
{ {
var errorResponse = new WebSocketMessage var errorResponse = new WebSocketMessage
@ -93,16 +162,21 @@ public class WebSocketMessageService : BackgroundService
}; };
await _connectionManager.QueueOutgoingMessage(errorResponse); await _connectionManager.QueueOutgoingMessage(errorResponse);
_logger.LogDebug("错误响应已发送,连接ID:{ConnectionId}", message.ConnectionId);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "Error sending error response for connection {ConnectionId}", message.ConnectionId); _logger.LogError(ex, "发送错误响应时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
} }
} }
public override void Dispose() public override void Dispose()
{ {
_logger.LogInformation("正在释放 WebSocket 消息服务资源");
// 取消订阅事件
_connectionManager.OnMessageReceived -= ProcessMessageAsync;
_processingSemaphore.Dispose(); _processingSemaphore.Dispose();
base.Dispose(); base.Dispose();
_logger.LogInformation("WebSocket 消息服务资源已释放");
} }
} }
Loading…
Cancel
Save