Browse Source

CellularManagement.WebSocket 类库代码优化

norm
root 2 months ago
parent
commit
47b4f6e0e8
  1. 2
      src/CellularManagement.WebAPI/Program.cs
  2. 17
      src/CellularManagement.WebAPI/Properties/launchSettings.json
  3. 14
      src/CellularManagement.WebSocket/Connection/IWebSocketMessageQueueManager.cs
  4. 71
      src/CellularManagement.WebSocket/Connection/WebSocketMessageQueueManager.cs
  5. 7
      src/CellularManagement.WebSocket/Handlers/HandlerRegistrar.cs
  6. 132
      src/CellularManagement.WebSocket/Handlers/WebSocketMessageHandlerAdapter.cs
  7. 88
      src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
  8. 4
      src/CellularManagement.WebSocket/Models/WebSocketMessage.cs
  9. 2
      src/CellularManagement.WebSocket/Models/WebSocketOptions.cs
  10. 4
      src/CellularManagement.WebSocket/Services/IncomingMessageProcessor.cs
  11. 8
      src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

2
src/CellularManagement.WebAPI/Program.cs

@ -53,7 +53,7 @@ builder.Services.AddWebSocketServices(options =>
{
// 配置 WebSocket 选项
options.MaxConcurrentConnections = 2000; // 最大并发连接数
options.MaxMessageSize = 8192; // 最大消息大小(字节)
options.MaxMessageSize = 1024 * 1024; // 最大消息大小(字节)
options.ConnectionTimeout = TimeSpan.FromMinutes(2); // 连接超时时间
options.HeartbeatInterval = TimeSpan.FromSeconds(15); // 心跳检测间隔
});

17
src/CellularManagement.WebAPI/Properties/launchSettings.json

@ -20,14 +20,15 @@
}
},
"https": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "https://localhost:7268;http://localhost:5000;https://192.168.3.147:7268;http://192.168.3.147:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
//"applicationUrl": "https://localhost:7268;http://localhost:5000;https://192.168.3.147:7268;http://192.168.3.147:5000",
"applicationUrl": "https://localhost:7268;http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"IIS Express": {
"commandName": "IISExpress",

14
src/CellularManagement.WebSocket/Connection/IWebSocketMessageQueueManager.cs

@ -14,14 +14,20 @@ public interface IWebSocketMessageQueueManager
event Func<WebSocketMessage, Task>? OnMessageReceived;
/// <summary>
/// 入队入站消息
/// 尝试入队入站消息
/// 如果队列已满,返回 false
/// </summary>
ValueTask QueueIncomingMessage(WebSocketMessage message);
/// <param name="message">WebSocket 消息</param>
/// <returns>是否成功入队</returns>
ValueTask<bool> TryQueueIncomingMessage(WebSocketMessage message);
/// <summary>
/// 入队出站消息
/// 尝试入队出站消息
/// 如果队列已满,返回 false
/// </summary>
ValueTask QueueOutgoingMessage(WebSocketMessage message);
/// <param name="message">WebSocket 消息</param>
/// <returns>是否成功入队</returns>
ValueTask<bool> TryQueueOutgoingMessage(WebSocketMessage message);
/// <summary>
/// 读取入站消息

71
src/CellularManagement.WebSocket/Connection/WebSocketMessageQueueManager.cs

@ -77,38 +77,67 @@ public class WebSocketMessageQueueManager : IWebSocketMessageQueueManager
}
/// <summary>
/// 入队入站消息
/// 将接收到的客户端消息放入入站消息队列,并触发消息到达事件
/// 尝试入队入站消息
/// 如果队列已满,返回 false
/// </summary>
/// <param name="message">WebSocket 消息</param>
public async ValueTask QueueIncomingMessage(WebSocketMessage message)
/// <returns>是否成功入队</returns>
public async ValueTask<bool> TryQueueIncomingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
// 将消息写入入站消息通道
await _incomingMessages.Writer.WriteAsync(message);
// 触发消息到达事件
if (OnMessageReceived != null)
try
{
_logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId);
await OnMessageReceived(message);
// 尝试写入消息,如果队列已满则返回 false
if (_incomingMessages.Writer.TryWrite(message))
{
_logger.LogDebug("成功入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
// 触发消息到达事件
if (OnMessageReceived != null)
{
_logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId);
await OnMessageReceived(message);
}
return true;
}
_logger.LogWarning("消息队列已满,无法入队消息,连接ID:{ConnectionId}", message.ConnectionId);
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "尝试入队消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
return false;
}
}
/// <summary>
/// 入队出站消息
/// 将待发送的消息放入出站消息队列
/// 尝试入队出站消息
/// 如果队列已满,返回 false
/// </summary>
/// <param name="message">WebSocket 消息</param>
public async ValueTask QueueOutgoingMessage(WebSocketMessage message)
/// <returns>是否成功入队</returns>
public ValueTask<bool> TryQueueOutgoingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
// 将消息写入出站消息通道
await _outgoingMessages.Writer.WriteAsync(message);
try
{
// 尝试写入消息,如果队列已满则返回 false
if (_outgoingMessages.Writer.TryWrite(message))
{
_logger.LogDebug("成功入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
return ValueTask.FromResult(true);
}
_logger.LogWarning("出站消息队列已满,无法入队消息,连接ID:{ConnectionId}", message.ConnectionId);
return ValueTask.FromResult(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "尝试入队出站消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
return ValueTask.FromResult(false);
}
}
/// <summary>

7
src/CellularManagement.WebSocket/Handlers/HandlerRegistrar.cs

@ -1,6 +1,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using CellularManagement.WebSocket.Handlers;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Handlers;
@ -13,12 +14,14 @@ public class HandlerRegistrar
private readonly HandlerManager _handlerManager;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<HandlerRegistrar> _logger;
private readonly WebSocketOptions _options;
public HandlerRegistrar(HandlerManager handlerManager, ILoggerFactory loggerFactory)
public HandlerRegistrar(HandlerManager handlerManager, ILoggerFactory loggerFactory, WebSocketOptions options)
{
_handlerManager = handlerManager;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<HandlerRegistrar>();
_options = options;
}
public async Task RegisterHandlersAsync(IEnumerable<IWebSocketMessageHandler> messageHandlers)
@ -26,7 +29,7 @@ public class HandlerRegistrar
foreach (var handler in messageHandlers)
{
_logger.LogInformation("注册消息处理器,消息类型:{MessageType}", handler.MessageType);
var adapter = new WebSocketMessageHandlerAdapter(handler, _loggerFactory.CreateLogger<WebSocketMessageHandlerAdapter>());
var adapter = new WebSocketMessageHandlerAdapter(handler, _loggerFactory.CreateLogger<WebSocketMessageHandlerAdapter>(), _options);
await _handlerManager.RegisterHandlerAsync(handler.MessageType, adapter);
}
}

132
src/CellularManagement.WebSocket/Handlers/WebSocketMessageHandlerAdapter.cs

@ -2,38 +2,156 @@ using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using CellularManagement.WebSocket.Services;
using Microsoft.Extensions.Logging;
using System.Diagnostics;
using System.Net.WebSockets;
namespace CellularManagement.WebSocket.Handlers
{
/// <summary>
/// WebSocket 消息处理器适配器
/// 提供以下功能:
/// 1. 消息处理适配
/// 2. 性能监控
/// 3. 错误处理和重试
/// 4. 消息验证
/// 5. 处理超时控制
/// </summary>
public class WebSocketMessageHandlerAdapter : BaseMessageHandler
{
private readonly IWebSocketMessageHandler _handler;
private readonly WebSocketOptions _options;
private readonly Stopwatch _stopwatch = new();
public WebSocketMessageHandlerAdapter(
IWebSocketMessageHandler handler,
ILogger<WebSocketMessageHandlerAdapter> logger)
ILogger<WebSocketMessageHandlerAdapter> logger,
WebSocketOptions options)
: base(logger)
{
_handler = handler;
_options = options;
}
public override string Name => _handler.GetType().Name;
public override int Priority => 0;
/// <summary>
/// 处理消息
/// </summary>
protected override async Task<WebSocketMessage> ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
try
_stopwatch.Restart();
var retryCount = 0;
while (true)
{
try
{
// 验证消息
if (!ValidateMessage(message))
{
_logger.LogWarning("消息验证失败,处理器:{HandlerName},连接ID:{ConnectionId}",
Name, message.ConnectionId);
return CreateErrorResponse(message, "Invalid message format");
}
// 使用超时控制处理消息
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_options.MessageSendTimeout);
var result = await _handler.HandleAsync(message);
_stopwatch.Stop();
// 记录性能指标
LogPerformanceMetrics(message, _stopwatch.ElapsedMilliseconds);
return result;
}
catch (OperationCanceledException)
{
_logger.LogWarning("消息处理超时,处理器:{HandlerName},连接ID:{ConnectionId}",
Name, message.ConnectionId);
return CreateErrorResponse(message, "Message processing timeout");
}
catch (Exception ex)
{
retryCount++;
_logger.LogError(ex, "处理消息时发生错误,处理器:{HandlerName},重试次数:{RetryCount},连接ID:{ConnectionId}",
Name, retryCount, message.ConnectionId);
if (retryCount >= _options.MessageRetryCount)
{
return CreateErrorResponse(message, "Message processing failed after retries");
}
// 等待重试间隔
await Task.Delay(_options.MessageRetryInterval, cancellationToken);
}
}
}
/// <summary>
/// 验证消息
/// </summary>
private bool ValidateMessage(WebSocketMessage message)
{
if (message == null)
{
_logger.LogWarning("消息为空,处理器:{HandlerName}", Name);
return false;
}
if (string.IsNullOrEmpty(message.ConnectionId))
{
_logger.LogWarning("连接ID为空,处理器:{HandlerName}", Name);
return false;
}
if (message.Data == null || message.Data.Length == 0)
{
_logger.LogWarning("消息数据为空,处理器:{HandlerName},连接ID:{ConnectionId}",
Name, message.ConnectionId);
return false;
}
if (message.Data.Length > _options.MaxMessageSize)
{
_logger.LogWarning("消息大小超过限制,处理器:{HandlerName},连接ID:{ConnectionId},大小:{Size},限制:{MaxSize}",
Name, message.ConnectionId, message.Data.Length, _options.MaxMessageSize);
return false;
}
return true;
}
/// <summary>
/// 创建错误响应
/// </summary>
private WebSocketMessage CreateErrorResponse(WebSocketMessage originalMessage, string errorMessage)
{
return new WebSocketMessage
{
ConnectionId = originalMessage.ConnectionId,
MessageType = WebSocketMessageType.Text,
Data = System.Text.Encoding.UTF8.GetBytes(errorMessage),
IsError = true
};
}
/// <summary>
/// 记录性能指标
/// </summary>
private void LogPerformanceMetrics(WebSocketMessage message, long elapsedMilliseconds)
{
if (elapsedMilliseconds > 1000) // 超过1秒的处理时间
{
var result = await _handler.HandleAsync(message);
return result;
_logger.LogWarning("消息处理时间过长,处理器:{HandlerName},连接ID:{ConnectionId},耗时:{ElapsedMs}ms",
Name, message.ConnectionId, elapsedMilliseconds);
}
catch (Exception ex)
else
{
_logger.LogError(ex, "处理消息时发生错误,处理器:{HandlerName}", Name);
throw;
_logger.LogDebug("消息处理完成,处理器:{HandlerName},连接ID:{ConnectionId},耗时:{ElapsedMs}ms",
Name, message.ConnectionId, elapsedMilliseconds);
}
}
}

88
src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs

@ -3,6 +3,8 @@ using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.Diagnostics;
using Microsoft.Extensions.Options;
namespace CellularManagement.WebSocket.Middleware;
@ -17,17 +19,20 @@ public class WebSocketMiddleware
private readonly IWebSocketMessageQueueManager _messageQueueManager;
private readonly ILogger<WebSocketMiddleware> _logger;
private readonly int _bufferSize = 1024 * 4;
private readonly WebSocketOptions _options;
public WebSocketMiddleware(
RequestDelegate next,
IWebSocketConnectionManager connectionManager,
IWebSocketMessageQueueManager messageQueueManager,
ILogger<WebSocketMiddleware> logger)
ILogger<WebSocketMiddleware> logger,
IOptions<WebSocketOptions> options)
{
_next = next;
_connectionManager = connectionManager;
_messageQueueManager = messageQueueManager;
_logger = logger;
_options = options.Value;
_logger.LogInformation("初始化 WebSocket 中间件,缓冲区大小:{BufferSize}字节", _bufferSize);
}
@ -80,33 +85,88 @@ public class WebSocketMiddleware
{
_logger.LogInformation("开始处理 WebSocket 连接,连接ID:{ConnectionId}", connectionId);
var stopwatch = new Stopwatch();
var messageCount = 0;
var totalBytes = 0L;
using var messageStream = new MemoryStream();
var buffer = new byte[_bufferSize];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
_logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
connectionId, receiveResult.MessageType, receiveResult.Count);
while (!receiveResult.CloseStatus.HasValue)
{
try
{
var message = new WebSocketMessage
stopwatch.Start();
// 检查消息大小
if (messageStream.Length + receiveResult.Count > _options.MaxMessageSize)
{
ConnectionId = connectionId,
Data = buffer.Take(receiveResult.Count).ToArray(),
MessageType = receiveResult.MessageType
};
_logger.LogWarning("消息大小超过限制,连接ID:{ConnectionId},当前大小:{CurrentSize},最大限制:{MaxSize}",
connectionId, messageStream.Length + receiveResult.Count, _options.MaxMessageSize);
await webSocket.CloseAsync(
WebSocketCloseStatus.MessageTooBig,
"Message too big",
CancellationToken.None);
return;
}
// 处理消息分片
if (receiveResult.MessageType == WebSocketMessageType.Text ||
receiveResult.MessageType == WebSocketMessageType.Binary)
{
await messageStream.WriteAsync(buffer, 0, receiveResult.Count);
if (receiveResult.EndOfMessage)
{
var message = new WebSocketMessage
{
ConnectionId = connectionId,
Data = messageStream.ToArray(),
MessageType = receiveResult.MessageType,
IsComplete = true
};
_logger.LogDebug("准备处理消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
connectionId, message.MessageType, message.Data.Length);
// 使用背压机制
if (!await _messageQueueManager.TryQueueIncomingMessage(message))
{
_logger.LogWarning("消息队列已满,等待处理,连接ID:{ConnectionId}", connectionId);
await Task.Delay(100); // 等待队列处理
continue;
}
await _messageQueueManager.QueueIncomingMessage(message);
_logger.LogDebug("消息已入队,连接ID:{ConnectionId}", connectionId);
messageCount++;
totalBytes += messageStream.Length;
messageStream.SetLength(0);
}
}
else if (receiveResult.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
break;
}
stopwatch.Stop();
// 记录性能指标
if (messageCount % 100 == 0)
{
_logger.LogInformation(
"连接性能统计,连接ID:{ConnectionId},消息数:{MessageCount},总字节数:{TotalBytes}," +
"平均处理时间:{AverageTime}ms,吞吐量:{Throughput}MB/s",
connectionId,
messageCount,
totalBytes,
stopwatch.ElapsedMilliseconds / messageCount,
(totalBytes / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0));
}
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
_logger.LogDebug("收到新消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
connectionId, receiveResult.MessageType, receiveResult.Count);
}
catch (Exception ex)
{

4
src/CellularManagement.WebSocket/Models/WebSocketMessage.cs

@ -12,4 +12,8 @@ public record WebSocketMessage
public WebSocketMessageType MessageType { get; init; } // 消息类型
public DateTime Timestamp { get; init; } = DateTime.UtcNow; // 时间戳
public int Priority { get; init; } = 0; // 优先级
public bool IsComplete { get; set; }
public bool IsError { get; set; }
}

2
src/CellularManagement.WebSocket/Models/WebSocketOptions.cs

@ -24,7 +24,7 @@ namespace CellularManagement.WebSocket.Models
/// <summary>
/// 消息队列大小
/// </summary>
public int MaxMessageSize { get; set; } = 1024*8;
public int MaxMessageSize { get; set; } = 1024*1024;
/// <summary>
/// 心跳检测间隔

4
src/CellularManagement.WebSocket/Services/IncomingMessageProcessor.cs

@ -221,7 +221,7 @@ public class IncomingMessageProcessor : IDisposable
if (processedMessage != null)
{
// 将处理后的消息入队
await _messageQueueManager.QueueOutgoingMessage(processedMessage);
await _messageQueueManager.TryQueueOutgoingMessage(processedMessage);
_logger.LogDebug("处理后的消息已入队,连接ID:{ConnectionId}", message.ConnectionId);
}
}
@ -265,7 +265,7 @@ public class IncomingMessageProcessor : IDisposable
};
// 将错误响应入队
await _messageQueueManager.QueueOutgoingMessage(errorResponse);
await _messageQueueManager.TryQueueOutgoingMessage(errorResponse);
_logger.LogDebug("错误响应已发送,连接ID:{ConnectionId}", message.ConnectionId);
}
catch (Exception ex)

8
src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

@ -7,6 +7,7 @@ using CellularManagement.WebSocket.Handlers;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading.Channels;
using Microsoft.Extensions.Options;
namespace CellularManagement.WebSocket.Services;
@ -61,6 +62,8 @@ public class WebSocketMessageService : BackgroundService
/// </summary>
private bool _disposed;
private IOptions<WebSocketOptions> _options;
/// <summary>
/// 构造函数
/// </summary>
@ -74,11 +77,12 @@ public class WebSocketMessageService : BackgroundService
IWebSocketMessageQueueManager messageQueueManager,
ILogger<WebSocketMessageService> logger,
ILoggerFactory loggerFactory,
IEnumerable<IWebSocketMessageHandler> messageHandlers)
IEnumerable<IWebSocketMessageHandler> messageHandlers, IOptions<WebSocketOptions> options)
{
_coordinator = coordinator;
_messageQueueManager = messageQueueManager;
_logger = logger;
_options=options;
_stopTcs = new TaskCompletionSource();
_logger.LogInformation("初始化 WebSocket 消息服务");
@ -86,7 +90,7 @@ public class WebSocketMessageService : BackgroundService
var handlerManager = new HandlerManager(loggerFactory.CreateLogger<HandlerManager>());
// 注册消息处理器
var handlerRegistrar = new HandlerRegistrar(handlerManager, loggerFactory);
var handlerRegistrar = new HandlerRegistrar(handlerManager, loggerFactory, _options.Value);
_ = handlerRegistrar.RegisterHandlersAsync(messageHandlers);
// 创建入站消息处理器

Loading…
Cancel
Save