From d678bf19e56dbc8dfc5addc4aa0b635baee190b7 Mon Sep 17 00:00:00 2001 From: root <295172551@qq.com> Date: Sat, 26 Jul 2025 20:35:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=20ProcessMessage=20=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E5=A4=84=E7=90=86=EF=BC=9A1)=20=E6=B7=BB=E5=8A=A0=20t?= =?UTF-8?q?ry-catch=20=E5=BC=82=E5=B8=B8=E8=AE=B0=E5=BD=95=202)=20?= =?UTF-8?q?=E5=B0=86=20OperationCanceledException=20=E8=BD=AC=E6=8D=A2?= =?UTF-8?q?=E4=B8=BA=20WebSocketException=203)=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E8=BE=93=E5=85=A5=E5=8F=82=E6=95=B0=E9=AA=8C=E8=AF=81=E6=A3=80?= =?UTF-8?q?=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Middleware/WebSocketMiddleware.cs | 92 +++++++++++++------ 1 file changed, 65 insertions(+), 27 deletions(-) diff --git a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs index b9d40c8..1fd8d1c 100644 --- a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs +++ b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs @@ -388,43 +388,81 @@ public class WebSocketMiddleware { var processingStartTime = DateTime.UtcNow; - var success = await _errorHandler.HandleWithRetryAsync(async () => + try { - // 为每个消息创建独立的消息缓冲区,避免并发问题 - using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); - - if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) + var success = await _errorHandler.HandleWithRetryAsync(async () => { - throw new WebSocketException("消息缓冲区溢出"); - } + // 为每个消息创建独立的消息缓冲区,避免并发问题 + using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); + + // 验证输入参数 + if (buffer == null || buffer.Length == 0) + { + throw new WebSocketException("接收缓冲区为空或无效"); + } + + if (receiveResult.Count <= 0 || receiveResult.Count > buffer.Length) + { + throw new WebSocketException($"无效的消息大小:{receiveResult.Count},缓冲区大小:{buffer.Length}"); + } + + if (receiveResult.Count > _options.MaxMessageSize) + { + throw new WebSocketException($"消息大小超过限制:{receiveResult.Count} > {_options.MaxMessageSize}"); + } + + if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) + { + throw new WebSocketException("消息缓冲区溢出"); + } - if (receiveResult.EndOfMessage) - { - var message = new WebSocketMessage + if (receiveResult.EndOfMessage) { - ConnectionId = connectionId, - Data = messageBuffer.GetMessage(), - MessageType = receiveResult.MessageType, - IsComplete = true - }; + var message = new WebSocketMessage + { + ConnectionId = connectionId, + Data = messageBuffer.GetMessage(), + MessageType = receiveResult.MessageType, + IsComplete = true + }; - await messageChannel.Writer.WriteAsync(message, cancellationToken); + await messageChannel.Writer.WriteAsync(message, cancellationToken); - var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds; - _performanceMonitor.RecordMessage( - connectionId, - message.Data.Length, - (long)processingTime); + var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds; + _performanceMonitor.RecordMessage( + connectionId, + message.Data.Length, + (long)processingTime); - // 记录 Channel 消息处理统计 - _channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); + // 记录 Channel 消息处理统计 + _channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); - // messageBuffer 会在 using 语句中自动释放 - } - }); + // messageBuffer 会在 using 语句中自动释放 + } + }); - if (!success) + if (!success) + { + await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); + } + } + catch (WebSocketException ex) + { + _logger.LogError(ex, "WebSocket 消息处理异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", + connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); + await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); + } + catch (OperationCanceledException ex) + { + _logger.LogWarning(ex, "消息处理被取消,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节", + connectionId, receiveResult.MessageType, receiveResult.Count); + // 取消操作转换为 WebSocket 异常 + throw new WebSocketException("消息处理操作被取消"); + } + catch (Exception ex) { + _logger.LogError(ex, "消息处理发生未知异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", + connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); } }