diff --git a/src/CellularManagement.WebSocket/Buffer/WebSocketMessageBuffer.cs b/src/CellularManagement.WebSocket/Buffer/WebSocketMessageBuffer.cs
new file mode 100644
index 0000000..256dd52
--- /dev/null
+++ b/src/CellularManagement.WebSocket/Buffer/WebSocketMessageBuffer.cs
@@ -0,0 +1,164 @@
+using System;
+using System.Buffers;
+using System.Runtime.CompilerServices;
+
+namespace CellularManagement.WebSocket.Buffer;
+
+///
+/// WebSocket 消息缓冲区
+/// 用于高效处理 WebSocket 消息的缓冲区实现
+///
+public sealed class WebSocketMessageBuffer : IDisposable
+{
+ private readonly byte[] _buffer;
+ private int _position;
+ private readonly int _maxSize;
+ private bool _isDisposed;
+ private const int MinBufferSize = 1024; // 最小缓冲区大小 1KB
+ private const int MaxBufferSize = 1024 * 1024 * 10; // 最大缓冲区大小 10MB
+
+ ///
+ /// 当前缓冲区大小
+ ///
+ public int Size => _position;
+
+ ///
+ /// 缓冲区是否已满
+ ///
+ public bool IsFull => _position >= _maxSize;
+
+ ///
+ /// 构造函数
+ ///
+ /// 最大缓冲区大小
+ /// 当 maxSize 超出允许范围时抛出
+ public WebSocketMessageBuffer(int maxSize)
+ {
+ if (maxSize < MinBufferSize || maxSize > MaxBufferSize)
+ {
+ throw new ArgumentOutOfRangeException(nameof(maxSize),
+ $"缓冲区大小必须在 {MinBufferSize} 到 {MaxBufferSize} 字节之间");
+ }
+
+ _maxSize = maxSize;
+ _buffer = ArrayPool.Shared.Rent(maxSize);
+ _position = 0;
+ }
+
+ ///
+ /// 尝试将数据写入缓冲区
+ ///
+ /// 要写入的数据
+ /// 数据偏移量
+ /// 要写入的字节数
+ /// 是否写入成功
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public bool TryWrite(byte[] data, int offset, int count)
+ {
+ if (_isDisposed)
+ {
+ throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
+ }
+
+ if (data == null)
+ {
+ throw new ArgumentNullException(nameof(data));
+ }
+
+ if (offset < 0 || count < 0 || offset + count > data.Length)
+ {
+ throw new ArgumentOutOfRangeException(nameof(offset));
+ }
+
+ if (_position + count > _maxSize)
+ {
+ return false;
+ }
+
+ System.Buffer.BlockCopy(data, offset, _buffer, _position, count);
+ _position += count;
+ return true;
+ }
+
+ ///
+ /// 获取当前缓冲区中的消息
+ ///
+ /// 消息字节数组
+ public byte[] GetMessage()
+ {
+ if (_isDisposed)
+ {
+ throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
+ }
+
+ if (_position == 0)
+ {
+ return Array.Empty();
+ }
+
+ var result = new byte[_position];
+ System.Buffer.BlockCopy(_buffer, 0, result, 0, _position);
+ return result;
+ }
+
+ ///
+ /// 获取当前缓冲区中的消息,使用内存池
+ ///
+ /// 消息字节数组的租借对象
+ public IMemoryOwner GetMessageWithMemoryPool()
+ {
+ if (_isDisposed)
+ {
+ throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
+ }
+
+ if (_position == 0)
+ {
+ return new EmptyMemoryOwner();
+ }
+
+ var memoryOwner = MemoryPool.Shared.Rent(_position);
+ var span = memoryOwner.Memory.Span;
+ for (int i = 0; i < _position; i++)
+ {
+ span[i] = _buffer[i];
+ }
+ return memoryOwner;
+ }
+
+ ///
+ /// 重置缓冲区
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void Reset()
+ {
+ if (_isDisposed)
+ {
+ throw new ObjectDisposedException(nameof(WebSocketMessageBuffer));
+ }
+
+ _position = 0;
+ }
+
+ ///
+ /// 释放资源
+ ///
+ public void Dispose()
+ {
+ if (!_isDisposed)
+ {
+ ArrayPool.Shared.Return(_buffer);
+ _isDisposed = true;
+ }
+ }
+
+ private sealed class EmptyMemoryOwner : IMemoryOwner
+ {
+ public Memory Memory => Memory.Empty;
+
+ public void Dispose()
+ {
+ // 空实现,因为 Empty 不需要释放
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs b/src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs
index b634e29..ef602b2 100644
--- a/src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs
+++ b/src/CellularManagement.WebSocket/Connection/ConnectionManagerCoordinator.cs
@@ -79,6 +79,7 @@ public class ConnectionManagerCoordinator
_processingSemaphore.Release();
_logger.LogDebug("结束处理连接,连接ID:{ConnectionId}", connectionId);
}
+ await Task.CompletedTask.ConfigureAwait(false);
}
///
diff --git a/src/CellularManagement.WebSocket/Handler/WebSocketErrorHandler.cs b/src/CellularManagement.WebSocket/Handler/WebSocketErrorHandler.cs
new file mode 100644
index 0000000..d37e421
--- /dev/null
+++ b/src/CellularManagement.WebSocket/Handler/WebSocketErrorHandler.cs
@@ -0,0 +1,49 @@
+using System;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace CellularManagement.WebSocket.Handler;
+
+///
+/// WebSocket 错误处理器
+/// 提供错误处理和重试机制
+///
+public class WebSocketErrorHandler
+{
+ private readonly ILogger _logger;
+ private readonly int _maxRetries;
+ private readonly TimeSpan _retryDelay;
+
+ public WebSocketErrorHandler(ILogger logger, int maxRetries = 3, TimeSpan? retryDelay = null)
+ {
+ _logger = logger;
+ _maxRetries = maxRetries;
+ _retryDelay = retryDelay ?? TimeSpan.FromSeconds(1);
+ }
+
+ ///
+ /// 使用重试机制执行操作
+ ///
+ /// 要执行的操作
+ /// 操作是否成功
+ public async Task HandleWithRetryAsync(Func action)
+ {
+ for (int i = 0; i < _maxRetries; i++)
+ {
+ try
+ {
+ await action();
+ return true;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "操作失败,重试 {RetryCount}/{MaxRetries}", i + 1, _maxRetries);
+ if (i < _maxRetries - 1)
+ {
+ await Task.Delay(_retryDelay);
+ }
+ }
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs b/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
index 63d05d5..4190368 100644
--- a/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
+++ b/src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
@@ -5,22 +5,83 @@ using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.Diagnostics;
using Microsoft.Extensions.Options;
+using System.Buffers;
+using System.Collections.Concurrent;
+using System.Threading;
+using CellularManagement.WebSocket.Buffer;
+using CellularManagement.WebSocket.Handler;
+using CellularManagement.WebSocket.Monitor;
+using System.Threading.Channels;
+using Microsoft.Extensions.ObjectPool;
namespace CellularManagement.WebSocket.Middleware;
///
/// WebSocket 中间件
-/// 负责处理 WebSocket 连接的建立和消息的接收
+/// 负责处理 WebSocket 连接的建立、消息的接收和处理
///
+///
+/// 主要功能:
+/// 1. 管理 WebSocket 连接的生命周期
+/// 2. 处理消息的接收和发送
+/// 3. 实现消息队列和背压机制
+/// 4. 提供性能监控和错误处理
+/// 5. 使用对象池优化资源使用
+///
+/// WebSocket 关闭状态码说明:
+/// 1000 (NormalClosure): 正常关闭,表示连接在请求完成后正常关闭
+/// 1001 (EndpointUnavailable): 终端不可用,表示服务器或客户端将被移除
+/// 1002 (ProtocolError): 协议错误,表示因为协议错误而终止连接
+/// 1003 (InvalidMessageType): 消息类型错误,表示无法接受接收到的数据类型
+/// 1007 (InvalidPayloadData): 数据格式错误,表示收到了与消息类型不一致的数据
+/// 1008 (PolicyViolation): 策略违规,表示收到了违反策略的消息
+/// 1009 (MessageTooBig): 消息过大,表示消息超出处理能力
+/// 1011 (InternalServerError): 服务器内部错误,表示服务器发生内部错误
+///
+///
+/// 使用示例:
+///
+/// // 在 Startup.cs 中注册中间件
+/// app.UseWebSocketServer();
+///
+/// // 配置 WebSocket 选项
+/// services.Configure(options =>
+/// {
+/// options.MaxMessageSize = 1024 * 1024; // 1MB
+/// options.MaxQueueSize = 1000;
+/// });
+///
+/// // 关闭连接示例
+/// // 正常关闭连接
+/// await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "正常关闭", CancellationToken.None);
+///
+/// // 消息过大时关闭
+/// await webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, "消息过大", CancellationToken.None);
+///
+/// // 服务器错误时关闭
+/// await webSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, "服务器错误", CancellationToken.None);
+///
+///
public class WebSocketMiddleware
{
private readonly RequestDelegate _next;
private readonly IWebSocketConnectionManager _connectionManager;
private readonly IWebSocketMessageQueueManager _messageQueueManager;
private readonly ILogger _logger;
- private readonly int _bufferSize = 1024 * 4;
private readonly WebSocketOptions _options;
+ private readonly WebSocketMessageBuffer _messageBuffer;
+ private readonly WebSocketErrorHandler _errorHandler;
+ private readonly WebSocketPerformanceMonitor _performanceMonitor;
+ private readonly ObjectPool> _channelPool;
+ ///
+ /// 构造函数
+ ///
+ /// 请求委托
+ /// 连接管理器
+ /// 消息队列管理器
+ /// 日志记录器
+ /// WebSocket 配置选项
public WebSocketMiddleware(
RequestDelegate next,
IWebSocketConnectionManager connectionManager,
@@ -33,7 +94,13 @@ public class WebSocketMiddleware
_messageQueueManager = messageQueueManager;
_logger = logger;
_options = options.Value;
- _logger.LogInformation("初始化 WebSocket 中间件,缓冲区大小:{BufferSize}字节", _bufferSize);
+ _messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize);
+ _errorHandler = new WebSocketErrorHandler(logger, _options.MessageRetryCount, _options.MessageRetryInterval);
+ _performanceMonitor = new WebSocketPerformanceMonitor(logger);
+
+ // 创建 Channel 对象池,用于复用消息通道
+ _channelPool = new DefaultObjectPool>(
+ new ChannelPooledObjectPolicy());
}
///
@@ -81,109 +148,387 @@ public class WebSocketMiddleware
///
/// 处理 WebSocket 连接
///
+ /// WebSocket 连接实例
+ /// 连接的唯一标识符
+ ///
+ /// 该方法负责处理单个 WebSocket 连接的完整生命周期:
+ /// 1. 初始化连接资源(缓冲区、消息通道等)
+ /// 2. 启动消息处理循环
+ /// 3. 处理消息接收
+ /// 4. 完成消息处理
+ /// 5. 清理资源
+ ///
private async Task HandleWebSocketConnection(System.Net.WebSockets.WebSocket webSocket, string connectionId)
{
_logger.LogInformation("开始处理 WebSocket 连接,连接ID:{ConnectionId}", connectionId);
- var stopwatch = new Stopwatch();
- var messageCount = 0;
- var totalBytes = 0L;
-
- using var messageStream = new MemoryStream();
- var buffer = new byte[_bufferSize];
- var receiveResult = await webSocket.ReceiveAsync(
- new ArraySegment(buffer), CancellationToken.None);
+ // 使用原子操作的状态标志,确保线程安全
+ var processingState = new AtomicBoolean(false);
+ // 从内存池租借缓冲区,用于接收消息
+ var buffer = ArrayPool.Shared.Rent(1024 * 4);
+ // 从对象池获取消息通道
+ var messageChannel = _channelPool.Get();
- while (!receiveResult.CloseStatus.HasValue)
+ try
{
- try
- {
- stopwatch.Start();
+ // 创建取消令牌源,设置连接超时
+ using var cts = new CancellationTokenSource(_options.ConnectionTimeout);
+ var messageStartTime = DateTime.UtcNow;
- // 检查消息大小
- if (messageStream.Length + receiveResult.Count > _options.MaxMessageSize)
- {
- _logger.LogWarning("消息大小超过限制,连接ID:{ConnectionId},当前大小:{CurrentSize},最大限制:{MaxSize}",
- connectionId, messageStream.Length + receiveResult.Count, _options.MaxMessageSize);
- await webSocket.CloseAsync(
- WebSocketCloseStatus.MessageTooBig,
- "Message too big",
- CancellationToken.None);
- return;
- }
+ // 启动消息处理循环,异步处理接收到的消息
+ var processTask = StartMessageProcessingLoop(messageChannel, processingState, cts.Token);
+
+ // 处理消息接收,包括消息的接收、解析和处理
+ await ProcessWebSocketMessages(webSocket, connectionId, buffer, messageChannel,
+ messageStartTime, cts.Token);
- // 处理消息分片
- if (receiveResult.MessageType == WebSocketMessageType.Text ||
- receiveResult.MessageType == WebSocketMessageType.Binary)
+ // 完成消息处理,等待所有消息处理完成
+ await CompleteMessageProcessing(messageChannel, processingState, cts.Token);
+ }
+ finally
+ {
+ // 释放资源
+ ArrayPool.Shared.Return(buffer);
+ _channelPool.Return(messageChannel);
+ await CloseWebSocketIfOpen(webSocket);
+ }
+ }
+
+ ///
+ /// 启动消息处理循环
+ ///
+ /// 消息通道
+ /// 处理状态标志
+ /// 取消令牌
+ /// 处理任务
+ ///
+ /// 该方法负责:
+ /// 1. 从消息通道读取消息
+ /// 2. 使用原子操作确保消息处理的线程安全
+ /// 3. 将消息加入处理队列
+ /// 4. 处理队列满时的重试逻辑
+ ///
+ private Task StartMessageProcessingLoop(
+ Channel messageChannel,
+ AtomicBoolean processingState,
+ CancellationToken cancellationToken)
+ {
+ return Task.Run(async () =>
+ {
+ _logger.LogInformation("开始消息处理循环");
+ try
+ {
+ // 使用异步流读取所有消息,直到通道关闭或取消
+ await foreach (var message in messageChannel.Reader.ReadAllAsync(cancellationToken))
{
- await messageStream.WriteAsync(buffer, 0, receiveResult.Count);
+ _logger.LogDebug("收到新消息,连接ID:{ConnectionId},消息类型:{MessageType}",
+ message.ConnectionId, message.MessageType);
- if (receiveResult.EndOfMessage)
+ // 使用原子操作设置处理状态,确保同一时间只有一个消息在处理
+ if (!processingState.Set(true))
{
- var message = new WebSocketMessage
+ try
{
- ConnectionId = connectionId,
- Data = messageStream.ToArray(),
- MessageType = receiveResult.MessageType,
- IsComplete = true
- };
-
- // 使用背压机制
- if (!await _messageQueueManager.TryQueueIncomingMessage(message))
+ // 尝试将消息加入处理队列
+ if (!await _messageQueueManager.TryQueueIncomingMessage(message))
+ {
+ _logger.LogWarning("消息队列已满,消息将重新入队,连接ID:{ConnectionId}",
+ message.ConnectionId);
+ // 如果队列已满,将消息重新写入通道并等待一段时间后重试
+ await messageChannel.Writer.WriteAsync(message, cancellationToken);
+ await Task.Delay(100, cancellationToken);
+ }
+ else
+ {
+ _logger.LogDebug("消息已成功加入处理队列,连接ID:{ConnectionId}",
+ message.ConnectionId);
+ }
+ }
+ finally
{
- _logger.LogWarning("消息队列已满,等待处理,连接ID:{ConnectionId}", connectionId);
- await Task.Delay(100); // 等待队列处理
- continue;
+ // 无论处理成功与否,都重置处理状态
+ processingState.Set(false);
}
-
- messageCount++;
- totalBytes += messageStream.Length;
- messageStream.SetLength(0);
+ }
+ else
+ {
+ _logger.LogDebug("消息正在处理中,跳过当前消息,连接ID:{ConnectionId}",
+ message.ConnectionId);
}
}
- else if (receiveResult.MessageType == WebSocketMessageType.Close)
- {
- await webSocket.CloseAsync(
- receiveResult.CloseStatus.Value,
- receiveResult.CloseStatusDescription,
- CancellationToken.None);
- break;
- }
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogInformation("消息处理循环已取消");
+ }
+ catch (Exception ex)
+ {
+ // 记录处理循环中的异常
+ _logger.LogError(ex, "消息处理循环发生错误,连接ID:{ConnectionId}",
+ messageChannel.Reader.Completion.IsCompleted);
+ }
+ finally
+ {
+ _logger.LogInformation("消息处理循环已结束");
+ }
+ }, cancellationToken);
+ }
- stopwatch.Stop();
+ ///
+ /// 处理 WebSocket 消息
+ ///
+ /// WebSocket 连接实例
+ /// 连接ID
+ /// 接收缓冲区
+ /// 消息通道
+ /// 消息开始时间
+ /// 取消令牌
+ ///
+ /// 该方法负责:
+ /// 1. 接收 WebSocket 消息
+ /// 2. 检查消息超时
+ /// 3. 处理不同类型的消息(文本、二进制、关闭等)
+ /// 4. 将消息写入处理通道
+ ///
+ private async Task ProcessWebSocketMessages(
+ System.Net.WebSockets.WebSocket webSocket,
+ string connectionId,
+ byte[] buffer,
+ Channel messageChannel,
+ DateTime messageStartTime,
+ CancellationToken cancellationToken)
+ {
+ _logger.LogInformation("开始处理 WebSocket 消息,连接ID:{ConnectionId}", connectionId);
+
+ // 接收第一条消息
+ var receiveResult = await webSocket.ReceiveAsync(
+ new ArraySegment(buffer), cancellationToken);
+ _logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType}",
+ connectionId, receiveResult.MessageType);
- // 记录性能指标
- if (messageCount % 100 == 0)
- {
- _logger.LogInformation(
- "连接性能统计,连接ID:{ConnectionId},消息数:{MessageCount},总字节数:{TotalBytes}," +
- "平均处理时间:{AverageTime}ms,吞吐量:{Throughput}MB/s",
- connectionId,
- messageCount,
- totalBytes,
- stopwatch.ElapsedMilliseconds / messageCount,
- (totalBytes / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0));
- }
+ // 循环处理消息,直到收到关闭消息或发生错误
+ while (!receiveResult.CloseStatus.HasValue)
+ {
+ // 检查消息是否超时
+ if (IsMessageTimeout(messageStartTime))
+ {
+ _logger.LogWarning("消息处理超时,连接ID:{ConnectionId},已处理时间:{ElapsedTime}ms",
+ connectionId, (DateTime.UtcNow - messageStartTime).TotalMilliseconds);
+ await HandleMessageTimeout(webSocket, connectionId, cancellationToken);
+ return;
+ }
- receiveResult = await webSocket.ReceiveAsync(
- new ArraySegment(buffer), CancellationToken.None);
+ // 处理关闭消息
+ if (receiveResult.MessageType == WebSocketMessageType.Close)
+ {
+ _logger.LogInformation("收到关闭消息,连接ID:{ConnectionId},关闭状态:{CloseStatus}",
+ connectionId, receiveResult.CloseStatus);
+ await HandleCloseMessage(webSocket, receiveResult, cancellationToken);
+ break;
}
- catch (Exception ex)
+
+ // 处理有效消息类型(文本或二进制)
+ if (IsValidMessageType(receiveResult.MessageType))
{
- _logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
- connectionId, ex.Message);
- throw;
+ _logger.LogDebug("处理消息,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节",
+ connectionId, receiveResult.MessageType, receiveResult.Count);
+ await ProcessMessage(webSocket, connectionId, buffer, receiveResult,
+ messageChannel, messageStartTime, cancellationToken);
+ }
+ else
+ {
+ _logger.LogWarning("收到无效消息类型,连接ID:{ConnectionId},消息类型:{MessageType}",
+ connectionId, receiveResult.MessageType);
+ }
+
+ // 接收下一条消息
+ receiveResult = await webSocket.ReceiveAsync(
+ new ArraySegment(buffer), cancellationToken);
+ _logger.LogDebug("接收下一条消息,连接ID:{ConnectionId},消息类型:{MessageType}",
+ connectionId, receiveResult.MessageType);
+ }
+
+ _logger.LogInformation("WebSocket 消息处理完成,连接ID:{ConnectionId}", connectionId);
+ }
+
+ ///
+ /// 处理单个消息
+ ///
+ /// WebSocket 连接实例
+ /// 连接ID
+ /// 接收缓冲区
+ /// 接收结果
+ /// 消息通道
+ /// 消息开始时间
+ /// 取消令牌
+ ///
+ /// 该方法负责:
+ /// 1. 将接收到的数据写入消息缓冲区
+ /// 2. 检查消息是否完整
+ /// 3. 创建消息对象并写入通道
+ /// 4. 记录性能指标
+ ///
+ private async Task ProcessMessage(
+ System.Net.WebSockets.WebSocket webSocket,
+ string connectionId,
+ byte[] buffer,
+ WebSocketReceiveResult receiveResult,
+ Channel messageChannel,
+ DateTime messageStartTime,
+ CancellationToken cancellationToken)
+ {
+ var processingStartTime = DateTime.UtcNow;
+
+ var success = await _errorHandler.HandleWithRetryAsync(async () =>
+ {
+ if (!_messageBuffer.TryWrite(buffer, 0, receiveResult.Count))
+ {
+ throw new WebSocketException("消息缓冲区溢出");
+ }
+
+ if (receiveResult.EndOfMessage)
+ {
+ var message = new WebSocketMessage
+ {
+ ConnectionId = connectionId,
+ Data = _messageBuffer.GetMessage(),
+ MessageType = receiveResult.MessageType,
+ IsComplete = true
+ };
+
+ await messageChannel.Writer.WriteAsync(message, cancellationToken);
+
+ var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds;
+ _performanceMonitor.RecordMessage(
+ connectionId,
+ message.Data.Length,
+ (long)processingTime);
+
+ _messageBuffer.Reset();
+ messageStartTime = DateTime.UtcNow;
}
+ });
+
+ if (!success)
+ {
+ await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken);
}
+ }
- _logger.LogInformation("收到关闭请求,连接ID:{ConnectionId},关闭状态:{CloseStatus},描述:{CloseDescription}",
- connectionId, receiveResult.CloseStatus, receiveResult.CloseStatusDescription);
+ ///
+ /// 完成消息处理
+ ///
+ /// 消息通道
+ /// 处理状态标志
+ /// 取消令牌
+ ///
+ /// 该方法负责:
+ /// 1. 完成消息写入
+ /// 2. 等待所有消息处理完成
+ /// 3. 确保资源正确释放
+ ///
+ private async Task CompleteMessageProcessing(
+ Channel messageChannel,
+ AtomicBoolean processingState,
+ CancellationToken cancellationToken)
+ {
+ _logger.LogInformation("开始完成消息处理");
+
+ // 标记消息通道写入完成,不再接受新消息
+ messageChannel.Writer.Complete();
+ _logger.LogDebug("消息通道写入已完成");
+ // 等待所有消息处理完成
+ // 1. 检查消息通道是否完全关闭
+ // 2. 检查是否还有消息正在处理
+ while (!messageChannel.Reader.Completion.IsCompleted || processingState.Value)
+ {
+ _logger.LogDebug("等待消息处理完成,通道状态:{ChannelStatus},处理状态:{ProcessingStatus}",
+ messageChannel.Reader.Completion.IsCompleted ? "已完成" : "处理中",
+ processingState.Value ? "处理中" : "空闲");
+ // 每100毫秒检查一次状态
+ await Task.Delay(100, cancellationToken);
+ }
+
+ _logger.LogInformation("消息处理已完成");
+ }
+
+ ///
+ /// 关闭 WebSocket 连接(如果处于打开状态)
+ ///
+ /// WebSocket 连接实例
+ ///
+ /// 该方法负责:
+ /// 1. 检查连接状态
+ /// 2. 如果连接处于打开状态,则正常关闭
+ ///
+ private async Task CloseWebSocketIfOpen(System.Net.WebSockets.WebSocket webSocket)
+ {
+ if (webSocket.State == WebSocketState.Open)
+ {
+ await webSocket.CloseAsync(
+ WebSocketCloseStatus.NormalClosure,
+ "Connection closed",
+ CancellationToken.None);
+ }
+ }
+
+ ///
+ /// 检查消息是否超时
+ ///
+ private bool IsMessageTimeout(DateTime messageStartTime) =>
+ DateTime.UtcNow - messageStartTime > _options.MessageSendTimeout;
+
+ ///
+ /// 检查消息类型是否有效
+ ///
+ private bool IsValidMessageType(WebSocketMessageType messageType) =>
+ messageType == WebSocketMessageType.Text || messageType == WebSocketMessageType.Binary;
+
+ ///
+ /// 处理消息超时
+ ///
+ private async Task HandleMessageTimeout(
+ System.Net.WebSockets.WebSocket webSocket,
+ string connectionId,
+ CancellationToken cancellationToken)
+ {
+ _logger.LogWarning("消息处理超时,连接ID:{ConnectionId},关闭连接", connectionId);
+ await webSocket.CloseAsync(
+ WebSocketCloseStatus.EndpointUnavailable,
+ "Message timeout",
+ cancellationToken);
+ }
+
+ ///
+ /// 处理关闭消息
+ ///
+ private async Task HandleCloseMessage(
+ System.Net.WebSockets.WebSocket webSocket,
+ WebSocketReceiveResult receiveResult,
+ CancellationToken cancellationToken)
+ {
+ _logger.LogInformation("处理关闭消息,关闭状态:{CloseStatus},描述:{Description}",
+ receiveResult.CloseStatus, receiveResult.CloseStatusDescription);
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
- CancellationToken.None);
- _logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId);
+ cancellationToken);
+ }
+
+ ///
+ /// 处理消息处理失败
+ ///
+ private async Task HandleMessageProcessingFailure(
+ System.Net.WebSockets.WebSocket webSocket,
+ string connectionId,
+ CancellationToken cancellationToken)
+ {
+ _logger.LogError("消息处理失败,连接ID:{ConnectionId},关闭连接", connectionId);
+ await webSocket.CloseAsync(
+ WebSocketCloseStatus.InternalServerError,
+ "Message processing failed",
+ cancellationToken);
}
///
@@ -210,4 +555,68 @@ public class WebSocketMiddleware
_logger.LogError(ex, "关闭 WebSocket 连接时发生错误,错误信息:{ErrorMessage}", ex.Message);
}
}
+
+ ///
+ /// 原子布尔值
+ ///
+ private sealed class AtomicBoolean
+ {
+ private int _value;
+
+ public AtomicBoolean(bool initialValue)
+ {
+ _value = initialValue ? 1 : 0;
+ }
+
+ public bool Value => Interlocked.CompareExchange(ref _value, 1, 1) == 1;
+
+ public bool Set(bool value)
+ {
+ return Interlocked.Exchange(ref _value, value ? 1 : 0) == 1;
+ }
+ }
+}
+
+///
+/// Channel 对象池策略
+///
+///
+/// 该策略类负责:
+/// 1. 创建新的 Channel 实例
+/// 2. 检查 Channel 是否可以重用
+/// 3. 重置 Channel 状态
+///
+public class ChannelPooledObjectPolicy : IPooledObjectPolicy>
+{
+ ///
+ /// 创建新的 Channel 实例
+ ///
+ public Channel Create()
+ {
+ return Channel.CreateUnbounded(new UnboundedChannelOptions
+ {
+ SingleReader = true, // 单一读取器,确保消息顺序处理
+ SingleWriter = false // 允许多个写入器,支持并发写入
+ });
+ }
+
+ ///
+ /// 检查 Channel 是否可以重用
+ ///
+ public bool Return(Channel obj)
+ {
+ // 检查 Channel 是否已关闭
+ if (obj.Reader.Completion.IsCompleted)
+ {
+ return false;
+ }
+
+ // 重置 Channel 状态
+ if (obj.Writer.TryComplete())
+ {
+ return true;
+ }
+
+ return false;
+ }
}
\ No newline at end of file
diff --git a/src/CellularManagement.WebSocket/Monitor/WebSocketPerformanceMonitor.cs b/src/CellularManagement.WebSocket/Monitor/WebSocketPerformanceMonitor.cs
new file mode 100644
index 0000000..681869c
--- /dev/null
+++ b/src/CellularManagement.WebSocket/Monitor/WebSocketPerformanceMonitor.cs
@@ -0,0 +1,104 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using Microsoft.Extensions.Logging;
+
+namespace CellularManagement.WebSocket.Monitor;
+
+///
+/// WebSocket 性能监控器
+/// 用于收集和记录 WebSocket 连接的性能指标
+///
+public class WebSocketPerformanceMonitor
+{
+ private readonly ILogger _logger;
+ private readonly ConcurrentDictionary _metrics = new();
+
+ public class ConnectionMetrics
+ {
+ private long _totalMessages;
+ private long _totalBytes;
+ private long _processingTime;
+ private int _errorCount;
+
+ public long TotalMessages => _totalMessages;
+ public long TotalBytes => _totalBytes;
+ public long ProcessingTime => _processingTime;
+ public long LastMessageTime { get; set; }
+ public int ErrorCount => _errorCount;
+
+ public void IncrementMessages() => Interlocked.Increment(ref _totalMessages);
+ public void AddBytes(long bytes) => Interlocked.Add(ref _totalBytes, bytes);
+ public void AddProcessingTime(long time) => Interlocked.Add(ref _processingTime, time);
+ public void IncrementErrors() => Interlocked.Increment(ref _errorCount);
+ }
+
+ public WebSocketPerformanceMonitor(ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ ///
+ /// 记录消息处理性能指标
+ ///
+ /// 连接ID
+ /// 消息大小
+ /// 处理时间(毫秒)
+ public void RecordMessage(string connectionId, int messageSize, long processingTime)
+ {
+ var metrics = _metrics.GetOrAdd(connectionId, _ => new ConnectionMetrics());
+ metrics.IncrementMessages();
+ metrics.AddBytes(messageSize);
+ metrics.AddProcessingTime(processingTime);
+ metrics.LastMessageTime = DateTime.UtcNow.Ticks;
+
+ if (metrics.TotalMessages % 100 == 0)
+ {
+ LogMetrics(connectionId, metrics);
+ }
+ }
+
+ ///
+ /// 记录错误
+ ///
+ /// 连接ID
+ public void RecordError(string connectionId)
+ {
+ if (_metrics.TryGetValue(connectionId, out var metrics))
+ {
+ metrics.IncrementErrors();
+ }
+ }
+
+ ///
+ /// 获取连接的性能指标
+ ///
+ /// 连接ID
+ /// 性能指标
+ public ConnectionMetrics GetMetrics(string connectionId)
+ {
+ return _metrics.TryGetValue(connectionId, out var metrics) ? metrics : null;
+ }
+
+ ///
+ /// 清理连接的性能指标
+ ///
+ /// 连接ID
+ public void ClearMetrics(string connectionId)
+ {
+ _metrics.TryRemove(connectionId, out _);
+ }
+
+ private void LogMetrics(string connectionId, ConnectionMetrics metrics)
+ {
+ _logger.LogInformation(
+ "连接性能指标 - ID: {ConnectionId}, 消息数: {MessageCount}, " +
+ "总字节数: {TotalBytes}, 平均处理时间: {AvgProcessingTime}ms, " +
+ "错误数: {ErrorCount}",
+ connectionId,
+ metrics.TotalMessages,
+ metrics.TotalBytes,
+ metrics.ProcessingTime / metrics.TotalMessages,
+ metrics.ErrorCount);
+ }
+}
\ No newline at end of file