diff --git a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs index b9d40c8..1fd8d1c 100644 --- a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs +++ b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs @@ -388,43 +388,81 @@ public class WebSocketMiddleware { var processingStartTime = DateTime.UtcNow; - var success = await _errorHandler.HandleWithRetryAsync(async () => + try { - // 为每个消息创建独立的消息缓冲区,避免并发问题 - using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); - - if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) + var success = await _errorHandler.HandleWithRetryAsync(async () => { - throw new WebSocketException("消息缓冲区溢出"); - } + // 为每个消息创建独立的消息缓冲区,避免并发问题 + using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); + + // 验证输入参数 + if (buffer == null || buffer.Length == 0) + { + throw new WebSocketException("接收缓冲区为空或无效"); + } + + if (receiveResult.Count <= 0 || receiveResult.Count > buffer.Length) + { + throw new WebSocketException($"无效的消息大小:{receiveResult.Count},缓冲区大小:{buffer.Length}"); + } + + if (receiveResult.Count > _options.MaxMessageSize) + { + throw new WebSocketException($"消息大小超过限制:{receiveResult.Count} > {_options.MaxMessageSize}"); + } + + if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) + { + throw new WebSocketException("消息缓冲区溢出"); + } - if (receiveResult.EndOfMessage) - { - var message = new WebSocketMessage + if (receiveResult.EndOfMessage) { - ConnectionId = connectionId, - Data = messageBuffer.GetMessage(), - MessageType = receiveResult.MessageType, - IsComplete = true - }; + var message = new WebSocketMessage + { + ConnectionId = connectionId, + Data = messageBuffer.GetMessage(), + MessageType = receiveResult.MessageType, + IsComplete = true + }; - await messageChannel.Writer.WriteAsync(message, cancellationToken); + await messageChannel.Writer.WriteAsync(message, cancellationToken); - var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds; - _performanceMonitor.RecordMessage( - connectionId, - message.Data.Length, - (long)processingTime); + var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds; + _performanceMonitor.RecordMessage( + connectionId, + message.Data.Length, + (long)processingTime); - // 记录 Channel 消息处理统计 - _channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); + // 记录 Channel 消息处理统计 + _channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); - // messageBuffer 会在 using 语句中自动释放 - } - }); + // messageBuffer 会在 using 语句中自动释放 + } + }); - if (!success) + if (!success) + { + await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); + } + } + catch (WebSocketException ex) + { + _logger.LogError(ex, "WebSocket 消息处理异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", + connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); + await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); + } + catch (OperationCanceledException ex) + { + _logger.LogWarning(ex, "消息处理被取消,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节", + connectionId, receiveResult.MessageType, receiveResult.Count); + // 取消操作转换为 WebSocket 异常 + throw new WebSocketException("消息处理操作被取消"); + } + catch (Exception ex) { + _logger.LogError(ex, "消息处理发生未知异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", + connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); } }