Browse Source

修复 WebSocketMiddleware 缓冲区问题:1) 统一缓冲区大小为 MaxMessageSize 2) 移除共享消息缓冲区,改为每个消息创建独立缓冲区 3) 使用 using 语句确保资源正确释放

feature/x1-web-request
root 1 week ago
parent
commit
d03cdc9006
  1. 16
      src/X1.WebSocket/Middleware/WebSocketMiddleware.cs

16
src/X1.WebSocket/Middleware/WebSocketMiddleware.cs

@ -73,7 +73,8 @@ public class WebSocketMiddleware
private readonly IWebSocketMessageQueueManager _messageQueueManager; private readonly IWebSocketMessageQueueManager _messageQueueManager;
private readonly ILogger<WebSocketMiddleware> _logger; private readonly ILogger<WebSocketMiddleware> _logger;
private readonly WebSocketOptions _options; private readonly WebSocketOptions _options;
private readonly WebSocketMessageBuffer _messageBuffer; // 移除共享的消息缓冲区,改为每个连接创建独立的缓冲区
// private readonly WebSocketMessageBuffer _messageBuffer;
private readonly WebSocketErrorHandler _errorHandler; private readonly WebSocketErrorHandler _errorHandler;
private readonly WebSocketPerformanceMonitor _performanceMonitor; private readonly WebSocketPerformanceMonitor _performanceMonitor;
// 使用 WebSocket Channel 管理器来管理 Channel 的生命周期 // 使用 WebSocket Channel 管理器来管理 Channel 的生命周期
@ -100,7 +101,6 @@ public class WebSocketMiddleware
_messageQueueManager = messageQueueManager; _messageQueueManager = messageQueueManager;
_logger = logger; _logger = logger;
_options = options.Value; _options = options.Value;
_messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize);
_errorHandler = new WebSocketErrorHandler(logger, _options.MessageRetryCount, _options.MessageRetryInterval); _errorHandler = new WebSocketErrorHandler(logger, _options.MessageRetryCount, _options.MessageRetryInterval);
_performanceMonitor = new WebSocketPerformanceMonitor(logger); _performanceMonitor = new WebSocketPerformanceMonitor(logger);
@ -170,7 +170,7 @@ public class WebSocketMiddleware
// 使用原子操作的状态标志,确保线程安全 // 使用原子操作的状态标志,确保线程安全
var processingState = new AtomicBoolean(false); var processingState = new AtomicBoolean(false);
// 从内存池租借缓冲区,用于接收消息 // 从内存池租借缓冲区,用于接收消息
var buffer = ArrayPool<byte>.Shared.Rent(1024 * 4); var buffer = ArrayPool<byte>.Shared.Rent(_options.MaxMessageSize);
// 使用 Channel 管理器创建新的消息通道 // 使用 Channel 管理器创建新的消息通道
var messageChannel = await _channelManager.CreateNewMessageChannel(connectionId); var messageChannel = await _channelManager.CreateNewMessageChannel(connectionId);
@ -390,7 +390,10 @@ public class WebSocketMiddleware
var success = await _errorHandler.HandleWithRetryAsync(async () => var success = await _errorHandler.HandleWithRetryAsync(async () =>
{ {
if (!_messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) // 为每个消息创建独立的消息缓冲区,避免并发问题
using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize);
if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count))
{ {
throw new WebSocketException("消息缓冲区溢出"); throw new WebSocketException("消息缓冲区溢出");
} }
@ -400,7 +403,7 @@ public class WebSocketMiddleware
var message = new WebSocketMessage var message = new WebSocketMessage
{ {
ConnectionId = connectionId, ConnectionId = connectionId,
Data = _messageBuffer.GetMessage(), Data = messageBuffer.GetMessage(),
MessageType = receiveResult.MessageType, MessageType = receiveResult.MessageType,
IsComplete = true IsComplete = true
}; };
@ -416,8 +419,7 @@ public class WebSocketMiddleware
// 记录 Channel 消息处理统计 // 记录 Channel 消息处理统计
_channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); _channelManager.RecordMessageProcessed(messageChannel, message.Data.Length);
_messageBuffer.Reset(); // messageBuffer 会在 using 语句中自动释放
//messageStartTime = DateTime.UtcNow;
} }
}); });

Loading…
Cancel
Save