Browse Source

feat: 优化 WebSocket 中间件,添加消息缓冲、错误处理和性能监控功能

norm
hyh 2 months ago
parent
commit
9f50481ebd
  1. 164
      src/CellularManagement.WebSocket/Buffer/WebSocketMessageBuffer.cs
  2. 1
      src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs
  3. 49
      src/CellularManagement.WebSocket/Handler/WebSocketErrorHandler.cs
  4. 533
      src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
  5. 104
      src/CellularManagement.WebSocket/Monitor/WebSocketPerformanceMonitor.cs

164
src/CellularManagement.WebSocket/Buffer/WebSocketMessageBuffer.cs

@ -0,0 +1,164 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
namespace CellularManagement.WebSocket.Buffer;
/// <summary>
/// WebSocket 消息缓冲区
/// 用于高效处理 WebSocket 消息的缓冲区实现
/// </summary>
public sealed class WebSocketMessageBuffer : IDisposable
{
private readonly byte[] _buffer;
private int _position;
private readonly int _maxSize;
private bool _isDisposed;
private const int MinBufferSize = 1024; // 最小缓冲区大小 1KB
private const int MaxBufferSize = 1024 * 1024 * 10; // 最大缓冲区大小 10MB
/// <summary>
/// 当前缓冲区大小
/// </summary>
public int Size => _position;
/// <summary>
/// 缓冲区是否已满
/// </summary>
public bool IsFull => _position >= _maxSize;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="maxSize">最大缓冲区大小</param>
/// <exception cref="ArgumentOutOfRangeException">当 maxSize 超出允许范围时抛出</exception>
public WebSocketMessageBuffer(int maxSize)
{
if (maxSize < MinBufferSize || maxSize > MaxBufferSize)
{
throw new ArgumentOutOfRangeException(nameof(maxSize),
$"缓冲区大小必须在 {MinBufferSize} 到 {MaxBufferSize} 字节之间");
}
_maxSize = maxSize;
_buffer = ArrayPool<byte>.Shared.Rent(maxSize);
_position = 0;
}
/// <summary>
/// 尝试将数据写入缓冲区
/// </summary>
/// <param name="data">要写入的数据</param>
/// <param name="offset">数据偏移量</param>
/// <param name="count">要写入的字节数</param>
/// <returns>是否写入成功</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryWrite(byte[] data, int offset, int count)
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
}
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (offset < 0 || count < 0 || offset + count > data.Length)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (_position + count > _maxSize)
{
return false;
}
System.Buffer.BlockCopy(data, offset, _buffer, _position, count);
_position += count;
return true;
}
/// <summary>
/// 获取当前缓冲区中的消息
/// </summary>
/// <returns>消息字节数组</returns>
public byte[] GetMessage()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
}
if (_position == 0)
{
return Array.Empty<byte>();
}
var result = new byte[_position];
System.Buffer.BlockCopy(_buffer, 0, result, 0, _position);
return result;
}
/// <summary>
/// 获取当前缓冲区中的消息,使用内存池
/// </summary>
/// <returns>消息字节数组的租借对象</returns>
public IMemoryOwner<byte> GetMessageWithMemoryPool()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
}
if (_position == 0)
{
return new EmptyMemoryOwner();
}
var memoryOwner = MemoryPool<byte>.Shared.Rent(_position);
var span = memoryOwner.Memory.Span;
for (int i = 0; i < _position; i++)
{
span[i] = _buffer[i];
}
return memoryOwner;
}
/// <summary>
/// 重置缓冲区
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Reset()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
}
_position = 0;
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (!_isDisposed)
{
ArrayPool<byte>.Shared.Return(_buffer);
_isDisposed = true;
}
}
private sealed class EmptyMemoryOwner : IMemoryOwner<byte>
{
public Memory<byte> Memory => Memory<byte>.Empty;
public void Dispose()
{
// 空实现,因为 Empty 不需要释放
}
}
}

1
src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs

@ -79,6 +79,7 @@ public class ConnectionManagerCoordinator
_processingSemaphore.Release();
_logger.LogDebug("结束处理连接,连接ID:{ConnectionId}", connectionId);
}
await Task.CompletedTask.ConfigureAwait(false);
}
/// <summary>

49
src/CellularManagement.WebSocket/Handler/WebSocketErrorHandler.cs

@ -0,0 +1,49 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Handler;
/// <summary>
/// WebSocket 错误处理器
/// 提供错误处理和重试机制
/// </summary>
public class WebSocketErrorHandler
{
private readonly ILogger _logger;
private readonly int _maxRetries;
private readonly TimeSpan _retryDelay;
public WebSocketErrorHandler(ILogger logger, int maxRetries = 3, TimeSpan? retryDelay = null)
{
_logger = logger;
_maxRetries = maxRetries;
_retryDelay = retryDelay ?? TimeSpan.FromSeconds(1);
}
/// <summary>
/// 使用重试机制执行操作
/// </summary>
/// <param name="action">要执行的操作</param>
/// <returns>操作是否成功</returns>
public async Task<bool> HandleWithRetryAsync(Func<Task> action)
{
for (int i = 0; i < _maxRetries; i++)
{
try
{
await action();
return true;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "操作失败,重试 {RetryCount}/{MaxRetries}", i + 1, _maxRetries);
if (i < _maxRetries - 1)
{
await Task.Delay(_retryDelay);
}
}
}
return false;
}
}

533
src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs

@ -5,22 +5,83 @@ using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.Diagnostics;
using Microsoft.Extensions.Options;
using System.Buffers;
using System.Collections.Concurrent;
using System.Threading;
using CellularManagement.WebSocket.Buffer;
using CellularManagement.WebSocket.Handler;
using CellularManagement.WebSocket.Monitor;
using System.Threading.Channels;
using Microsoft.Extensions.ObjectPool;
namespace CellularManagement.WebSocket.Middleware;
/// <summary>
/// WebSocket 中间件
/// 负责处理 WebSocket 连接的建立和消息的接收
/// 负责处理 WebSocket 连接的建立、消息的接收和处理
/// </summary>
/// <remarks>
/// 主要功能:
/// 1. 管理 WebSocket 连接的生命周期
/// 2. 处理消息的接收和发送
/// 3. 实现消息队列和背压机制
/// 4. 提供性能监控和错误处理
/// 5. 使用对象池优化资源使用
/// </remarks>
/// WebSocket 关闭状态码说明:
/// 1000 (NormalClosure): 正常关闭,表示连接在请求完成后正常关闭
/// 1001 (EndpointUnavailable): 终端不可用,表示服务器或客户端将被移除
/// 1002 (ProtocolError): 协议错误,表示因为协议错误而终止连接
/// 1003 (InvalidMessageType): 消息类型错误,表示无法接受接收到的数据类型
/// 1007 (InvalidPayloadData): 数据格式错误,表示收到了与消息类型不一致的数据
/// 1008 (PolicyViolation): 策略违规,表示收到了违反策略的消息
/// 1009 (MessageTooBig): 消息过大,表示消息超出处理能力
/// 1011 (InternalServerError): 服务器内部错误,表示服务器发生内部错误
/// </remarks>
/// <example>
/// 使用示例:
/// <code>
/// // 在 Startup.cs 中注册中间件
/// app.UseWebSocketServer();
///
/// // 配置 WebSocket 选项
/// services.Configure<WebSocketOptions>(options =>
/// {
/// options.MaxMessageSize = 1024 * 1024; // 1MB
/// options.MaxQueueSize = 1000;
/// });
///
/// // 关闭连接示例
/// // 正常关闭连接
/// await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "正常关闭", CancellationToken.None);
///
/// // 消息过大时关闭
/// await webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, "消息过大", CancellationToken.None);
///
/// // 服务器错误时关闭
/// await webSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, "服务器错误", CancellationToken.None);
/// </code>
/// </example>
public class WebSocketMiddleware
{
private readonly RequestDelegate _next;
private readonly IWebSocketConnectionManager _connectionManager;
private readonly IWebSocketMessageQueueManager _messageQueueManager;
private readonly ILogger<WebSocketMiddleware> _logger;
private readonly int _bufferSize = 1024 * 4;
private readonly WebSocketOptions _options;
private readonly WebSocketMessageBuffer _messageBuffer;
private readonly WebSocketErrorHandler _errorHandler;
private readonly WebSocketPerformanceMonitor _performanceMonitor;
private readonly ObjectPool<Channel<WebSocketMessage>> _channelPool;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="next">请求委托</param>
/// <param name="connectionManager">连接管理器</param>
/// <param name="messageQueueManager">消息队列管理器</param>
/// <param name="logger">日志记录器</param>
/// <param name="options">WebSocket 配置选项</param>
public WebSocketMiddleware(
RequestDelegate next,
IWebSocketConnectionManager connectionManager,
@ -33,7 +94,13 @@ public class WebSocketMiddleware
_messageQueueManager = messageQueueManager;
_logger = logger;
_options = options.Value;
_logger.LogInformation("初始化 WebSocket 中间件,缓冲区大小:{BufferSize}字节", _bufferSize);
_messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize);
_errorHandler = new WebSocketErrorHandler(logger, _options.MessageRetryCount, _options.MessageRetryInterval);
_performanceMonitor = new WebSocketPerformanceMonitor(logger);
// 创建 Channel 对象池,用于复用消息通道
_channelPool = new DefaultObjectPool<Channel<WebSocketMessage>>(
new ChannelPooledObjectPolicy());
}
/// <summary>
@ -81,109 +148,387 @@ public class WebSocketMiddleware
/// <summary>
/// 处理 WebSocket 连接
/// </summary>
/// <param name="webSocket">WebSocket 连接实例</param>
/// <param name="connectionId">连接的唯一标识符</param>
/// <remarks>
/// 该方法负责处理单个 WebSocket 连接的完整生命周期:
/// 1. 初始化连接资源(缓冲区、消息通道等)
/// 2. 启动消息处理循环
/// 3. 处理消息接收
/// 4. 完成消息处理
/// 5. 清理资源
/// </remarks>
private async Task HandleWebSocketConnection(System.Net.WebSockets.WebSocket webSocket, string connectionId)
{
_logger.LogInformation("开始处理 WebSocket 连接,连接ID:{ConnectionId}", connectionId);
var stopwatch = new Stopwatch();
var messageCount = 0;
var totalBytes = 0L;
// 使用原子操作的状态标志,确保线程安全
var processingState = new AtomicBoolean(false);
// 从内存池租借缓冲区,用于接收消息
var buffer = ArrayPool<byte>.Shared.Rent(1024 * 4);
// 从对象池获取消息通道
var messageChannel = _channelPool.Get();
using var messageStream = new MemoryStream();
var buffer = new byte[_bufferSize];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
try
{
// 创建取消令牌源,设置连接超时
using var cts = new CancellationTokenSource(_options.ConnectionTimeout);
var messageStartTime = DateTime.UtcNow;
while (!receiveResult.CloseStatus.HasValue)
// 启动消息处理循环,异步处理接收到的消息
var processTask = StartMessageProcessingLoop(messageChannel, processingState, cts.Token);
// 处理消息接收,包括消息的接收、解析和处理
await ProcessWebSocketMessages(webSocket, connectionId, buffer, messageChannel,
messageStartTime, cts.Token);
// 完成消息处理,等待所有消息处理完成
await CompleteMessageProcessing(messageChannel, processingState, cts.Token);
}
finally
{
// 释放资源
ArrayPool<byte>.Shared.Return(buffer);
_channelPool.Return(messageChannel);
await CloseWebSocketIfOpen(webSocket);
}
}
/// <summary>
/// 启动消息处理循环
/// </summary>
/// <param name="messageChannel">消息通道</param>
/// <param name="processingState">处理状态标志</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>处理任务</returns>
/// <remarks>
/// 该方法负责:
/// 1. 从消息通道读取消息
/// 2. 使用原子操作确保消息处理的线程安全
/// 3. 将消息加入处理队列
/// 4. 处理队列满时的重试逻辑
/// </remarks>
private Task StartMessageProcessingLoop(
Channel<WebSocketMessage> messageChannel,
AtomicBoolean processingState,
CancellationToken cancellationToken)
{
return Task.Run(async () =>
{
_logger.LogInformation("开始消息处理循环");
try
{
stopwatch.Start();
// 使用异步流读取所有消息,直到通道关闭或取消
await foreach (var message in messageChannel.Reader.ReadAllAsync(cancellationToken))
{
_logger.LogDebug("收到新消息,连接ID:{ConnectionId},消息类型:{MessageType}",
message.ConnectionId, message.MessageType);
// 检查消息大小
if (messageStream.Length + receiveResult.Count > _options.MaxMessageSize)
// 使用原子操作设置处理状态,确保同一时间只有一个消息在处理
if (!processingState.Set(true))
{
_logger.LogWarning("消息大小超过限制,连接ID:{ConnectionId},当前大小:{CurrentSize},最大限制:{MaxSize}",
connectionId, messageStream.Length + receiveResult.Count, _options.MaxMessageSize);
await webSocket.CloseAsync(
WebSocketCloseStatus.MessageTooBig,
"Message too big",
CancellationToken.None);
try
{
// 尝试将消息加入处理队列
if (!await _messageQueueManager.TryQueueIncomingMessage(message))
{
_logger.LogWarning("消息队列已满,消息将重新入队,连接ID:{ConnectionId}",
message.ConnectionId);
// 如果队列已满,将消息重新写入通道并等待一段时间后重试
await messageChannel.Writer.WriteAsync(message, cancellationToken);
await Task.Delay(100, cancellationToken);
}
else
{
_logger.LogDebug("消息已成功加入处理队列,连接ID:{ConnectionId}",
message.ConnectionId);
}
}
finally
{
// 无论处理成功与否,都重置处理状态
processingState.Set(false);
}
}
else
{
_logger.LogDebug("消息正在处理中,跳过当前消息,连接ID:{ConnectionId}",
message.ConnectionId);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("消息处理循环已取消");
}
catch (Exception ex)
{
// 记录处理循环中的异常
_logger.LogError(ex, "消息处理循环发生错误,连接ID:{ConnectionId}",
messageChannel.Reader.Completion.IsCompleted);
}
finally
{
_logger.LogInformation("消息处理循环已结束");
}
}, cancellationToken);
}
/// <summary>
/// 处理 WebSocket 消息
/// </summary>
/// <param name="webSocket">WebSocket 连接实例</param>
/// <param name="connectionId">连接ID</param>
/// <param name="buffer">接收缓冲区</param>
/// <param name="messageChannel">消息通道</param>
/// <param name="messageStartTime">消息开始时间</param>
/// <param name="cancellationToken">取消令牌</param>
/// <remarks>
/// 该方法负责:
/// 1. 接收 WebSocket 消息
/// 2. 检查消息超时
/// 3. 处理不同类型的消息(文本、二进制、关闭等)
/// 4. 将消息写入处理通道
/// </remarks>
private async Task ProcessWebSocketMessages(
System.Net.WebSockets.WebSocket webSocket,
string connectionId,
byte[] buffer,
Channel<WebSocketMessage> messageChannel,
DateTime messageStartTime,
CancellationToken cancellationToken)
{
_logger.LogInformation("开始处理 WebSocket 消息,连接ID:{ConnectionId}", connectionId);
// 接收第一条消息
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), cancellationToken);
_logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType}",
connectionId, receiveResult.MessageType);
// 循环处理消息,直到收到关闭消息或发生错误
while (!receiveResult.CloseStatus.HasValue)
{
// 检查消息是否超时
if (IsMessageTimeout(messageStartTime))
{
_logger.LogWarning("消息处理超时,连接ID:{ConnectionId},已处理时间:{ElapsedTime}ms",
connectionId, (DateTime.UtcNow - messageStartTime).TotalMilliseconds);
await HandleMessageTimeout(webSocket, connectionId, cancellationToken);
return;
}
// 处理消息分片
if (receiveResult.MessageType == WebSocketMessageType.Text ||
receiveResult.MessageType == WebSocketMessageType.Binary)
// 处理关闭消息
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
_logger.LogInformation("收到关闭消息,连接ID:{ConnectionId},关闭状态:{CloseStatus}",
connectionId, receiveResult.CloseStatus);
await HandleCloseMessage(webSocket, receiveResult, cancellationToken);
break;
}
// 处理有效消息类型(文本或二进制)
if (IsValidMessageType(receiveResult.MessageType))
{
_logger.LogDebug("处理消息,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节",
connectionId, receiveResult.MessageType, receiveResult.Count);
await ProcessMessage(webSocket, connectionId, buffer, receiveResult,
messageChannel, messageStartTime, cancellationToken);
}
else
{
_logger.LogWarning("收到无效消息类型,连接ID:{ConnectionId},消息类型:{MessageType}",
connectionId, receiveResult.MessageType);
}
// 接收下一条消息
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), cancellationToken);
_logger.LogDebug("接收下一条消息,连接ID:{ConnectionId},消息类型:{MessageType}",
connectionId, receiveResult.MessageType);
}
_logger.LogInformation("WebSocket 消息处理完成,连接ID:{ConnectionId}", connectionId);
}
/// <summary>
/// 处理单个消息
/// </summary>
/// <param name="webSocket">WebSocket 连接实例</param>
/// <param name="connectionId">连接ID</param>
/// <param name="buffer">接收缓冲区</param>
/// <param name="receiveResult">接收结果</param>
/// <param name="messageChannel">消息通道</param>
/// <param name="messageStartTime">消息开始时间</param>
/// <param name="cancellationToken">取消令牌</param>
/// <remarks>
/// 该方法负责:
/// 1. 将接收到的数据写入消息缓冲区
/// 2. 检查消息是否完整
/// 3. 创建消息对象并写入通道
/// 4. 记录性能指标
/// </remarks>
private async Task ProcessMessage(
System.Net.WebSockets.WebSocket webSocket,
string connectionId,
byte[] buffer,
WebSocketReceiveResult receiveResult,
Channel<WebSocketMessage> messageChannel,
DateTime messageStartTime,
CancellationToken cancellationToken)
{
var processingStartTime = DateTime.UtcNow;
var success = await _errorHandler.HandleWithRetryAsync(async () =>
{
if (!_messageBuffer.TryWrite(buffer, 0, receiveResult.Count))
{
await messageStream.WriteAsync(buffer, 0, receiveResult.Count);
throw new WebSocketException("消息缓冲区溢出");
}
if (receiveResult.EndOfMessage)
{
var message = new WebSocketMessage
{
ConnectionId = connectionId,
Data = messageStream.ToArray(),
Data = _messageBuffer.GetMessage(),
MessageType = receiveResult.MessageType,
IsComplete = true
};
// 使用背压机制
if (!await _messageQueueManager.TryQueueIncomingMessage(message))
{
_logger.LogWarning("消息队列已满,等待处理,连接ID:{ConnectionId}", connectionId);
await Task.Delay(100); // 等待队列处理
continue;
await messageChannel.Writer.WriteAsync(message, cancellationToken);
var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds;
_performanceMonitor.RecordMessage(
connectionId,
message.Data.Length,
(long)processingTime);
_messageBuffer.Reset();
messageStartTime = DateTime.UtcNow;
}
});
messageCount++;
totalBytes += messageStream.Length;
messageStream.SetLength(0);
if (!success)
{
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken);
}
}
else if (receiveResult.MessageType == WebSocketMessageType.Close)
/// <summary>
/// 完成消息处理
/// </summary>
/// <param name="messageChannel">消息通道</param>
/// <param name="processingState">处理状态标志</param>
/// <param name="cancellationToken">取消令牌</param>
/// <remarks>
/// 该方法负责:
/// 1. 完成消息写入
/// 2. 等待所有消息处理完成
/// 3. 确保资源正确释放
/// </remarks>
private async Task CompleteMessageProcessing(
Channel<WebSocketMessage> messageChannel,
AtomicBoolean processingState,
CancellationToken cancellationToken)
{
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
break;
}
_logger.LogInformation("开始完成消息处理");
stopwatch.Stop();
// 标记消息通道写入完成,不再接受新消息
messageChannel.Writer.Complete();
_logger.LogDebug("消息通道写入已完成");
// 记录性能指标
if (messageCount % 100 == 0)
// 等待所有消息处理完成
// 1. 检查消息通道是否完全关闭
// 2. 检查是否还有消息正在处理
while (!messageChannel.Reader.Completion.IsCompleted || processingState.Value)
{
_logger.LogInformation(
"连接性能统计,连接ID:{ConnectionId},消息数:{MessageCount},总字节数:{TotalBytes}," +
"平均处理时间:{AverageTime}ms,吞吐量:{Throughput}MB/s",
connectionId,
messageCount,
totalBytes,
stopwatch.ElapsedMilliseconds / messageCount,
(totalBytes / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0));
_logger.LogDebug("等待消息处理完成,通道状态:{ChannelStatus},处理状态:{ProcessingStatus}",
messageChannel.Reader.Completion.IsCompleted ? "已完成" : "处理中",
processingState.Value ? "处理中" : "空闲");
// 每100毫秒检查一次状态
await Task.Delay(100, cancellationToken);
}
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
_logger.LogInformation("消息处理已完成");
}
catch (Exception ex)
/// <summary>
/// 关闭 WebSocket 连接(如果处于打开状态)
/// </summary>
/// <param name="webSocket">WebSocket 连接实例</param>
/// <remarks>
/// 该方法负责:
/// 1. 检查连接状态
/// 2. 如果连接处于打开状态,则正常关闭
/// </remarks>
private async Task CloseWebSocketIfOpen(System.Net.WebSockets.WebSocket webSocket)
{
_logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
connectionId, ex.Message);
throw;
if (webSocket.State == WebSocketState.Open)
{
await webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Connection closed",
CancellationToken.None);
}
}
_logger.LogInformation("收到关闭请求,连接ID:{ConnectionId},关闭状态:{CloseStatus},描述:{CloseDescription}",
connectionId, receiveResult.CloseStatus, receiveResult.CloseStatusDescription);
/// <summary>
/// 检查消息是否超时
/// </summary>
private bool IsMessageTimeout(DateTime messageStartTime) =>
DateTime.UtcNow - messageStartTime > _options.MessageSendTimeout;
/// <summary>
/// 检查消息类型是否有效
/// </summary>
private bool IsValidMessageType(WebSocketMessageType messageType) =>
messageType == WebSocketMessageType.Text || messageType == WebSocketMessageType.Binary;
/// <summary>
/// 处理消息超时
/// </summary>
private async Task HandleMessageTimeout(
System.Net.WebSockets.WebSocket webSocket,
string connectionId,
CancellationToken cancellationToken)
{
_logger.LogWarning("消息处理超时,连接ID:{ConnectionId},关闭连接", connectionId);
await webSocket.CloseAsync(
WebSocketCloseStatus.EndpointUnavailable,
"Message timeout",
cancellationToken);
}
/// <summary>
/// 处理关闭消息
/// </summary>
private async Task HandleCloseMessage(
System.Net.WebSockets.WebSocket webSocket,
WebSocketReceiveResult receiveResult,
CancellationToken cancellationToken)
{
_logger.LogInformation("处理关闭消息,关闭状态:{CloseStatus},描述:{Description}",
receiveResult.CloseStatus, receiveResult.CloseStatusDescription);
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
_logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId);
cancellationToken);
}
/// <summary>
/// 处理消息处理失败
/// </summary>
private async Task HandleMessageProcessingFailure(
System.Net.WebSockets.WebSocket webSocket,
string connectionId,
CancellationToken cancellationToken)
{
_logger.LogError("消息处理失败,连接ID:{ConnectionId},关闭连接", connectionId);
await webSocket.CloseAsync(
WebSocketCloseStatus.InternalServerError,
"Message processing failed",
cancellationToken);
}
/// <summary>
@ -210,4 +555,68 @@ public class WebSocketMiddleware
_logger.LogError(ex, "关闭 WebSocket 连接时发生错误,错误信息:{ErrorMessage}", ex.Message);
}
}
/// <summary>
/// 原子布尔值
/// </summary>
private sealed class AtomicBoolean
{
private int _value;
public AtomicBoolean(bool initialValue)
{
_value = initialValue ? 1 : 0;
}
public bool Value => Interlocked.CompareExchange(ref _value, 1, 1) == 1;
public bool Set(bool value)
{
return Interlocked.Exchange(ref _value, value ? 1 : 0) == 1;
}
}
}
/// <summary>
/// Channel 对象池策略
/// </summary>
/// <remarks>
/// 该策略类负责:
/// 1. 创建新的 Channel 实例
/// 2. 检查 Channel 是否可以重用
/// 3. 重置 Channel 状态
/// </remarks>
public class ChannelPooledObjectPolicy : IPooledObjectPolicy<Channel<WebSocketMessage>>
{
/// <summary>
/// 创建新的 Channel 实例
/// </summary>
public Channel<WebSocketMessage> Create()
{
return Channel.CreateUnbounded<WebSocketMessage>(new UnboundedChannelOptions
{
SingleReader = true, // 单一读取器,确保消息顺序处理
SingleWriter = false // 允许多个写入器,支持并发写入
});
}
/// <summary>
/// 检查 Channel 是否可以重用
/// </summary>
public bool Return(Channel<WebSocketMessage> obj)
{
// 检查 Channel 是否已关闭
if (obj.Reader.Completion.IsCompleted)
{
return false;
}
// 重置 Channel 状态
if (obj.Writer.TryComplete())
{
return true;
}
return false;
}
}

104
src/CellularManagement.WebSocket/Monitor/WebSocketPerformanceMonitor.cs

@ -0,0 +1,104 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Monitor;
/// <summary>
/// WebSocket 性能监控器
/// 用于收集和记录 WebSocket 连接的性能指标
/// </summary>
public class WebSocketPerformanceMonitor
{
private readonly ILogger _logger;
private readonly ConcurrentDictionary<string, ConnectionMetrics> _metrics = new();
public class ConnectionMetrics
{
private long _totalMessages;
private long _totalBytes;
private long _processingTime;
private int _errorCount;
public long TotalMessages => _totalMessages;
public long TotalBytes => _totalBytes;
public long ProcessingTime => _processingTime;
public long LastMessageTime { get; set; }
public int ErrorCount => _errorCount;
public void IncrementMessages() => Interlocked.Increment(ref _totalMessages);
public void AddBytes(long bytes) => Interlocked.Add(ref _totalBytes, bytes);
public void AddProcessingTime(long time) => Interlocked.Add(ref _processingTime, time);
public void IncrementErrors() => Interlocked.Increment(ref _errorCount);
}
public WebSocketPerformanceMonitor(ILogger logger)
{
_logger = logger;
}
/// <summary>
/// 记录消息处理性能指标
/// </summary>
/// <param name="connectionId">连接ID</param>
/// <param name="messageSize">消息大小</param>
/// <param name="processingTime">处理时间(毫秒)</param>
public void RecordMessage(string connectionId, int messageSize, long processingTime)
{
var metrics = _metrics.GetOrAdd(connectionId, _ => new ConnectionMetrics());
metrics.IncrementMessages();
metrics.AddBytes(messageSize);
metrics.AddProcessingTime(processingTime);
metrics.LastMessageTime = DateTime.UtcNow.Ticks;
if (metrics.TotalMessages % 100 == 0)
{
LogMetrics(connectionId, metrics);
}
}
/// <summary>
/// 记录错误
/// </summary>
/// <param name="connectionId">连接ID</param>
public void RecordError(string connectionId)
{
if (_metrics.TryGetValue(connectionId, out var metrics))
{
metrics.IncrementErrors();
}
}
/// <summary>
/// 获取连接的性能指标
/// </summary>
/// <param name="connectionId">连接ID</param>
/// <returns>性能指标</returns>
public ConnectionMetrics GetMetrics(string connectionId)
{
return _metrics.TryGetValue(connectionId, out var metrics) ? metrics : null;
}
/// <summary>
/// 清理连接的性能指标
/// </summary>
/// <param name="connectionId">连接ID</param>
public void ClearMetrics(string connectionId)
{
_metrics.TryRemove(connectionId, out _);
}
private void LogMetrics(string connectionId, ConnectionMetrics metrics)
{
_logger.LogInformation(
"连接性能指标 - ID: {ConnectionId}, 消息数: {MessageCount}, " +
"总字节数: {TotalBytes}, 平均处理时间: {AvgProcessingTime}ms, " +
"错误数: {ErrorCount}",
connectionId,
metrics.TotalMessages,
metrics.TotalBytes,
metrics.ProcessingTime / metrics.TotalMessages,
metrics.ErrorCount);
}
}
Loading…
Cancel
Save