|
@ -388,43 +388,81 @@ public class WebSocketMiddleware |
|
|
{ |
|
|
{ |
|
|
var processingStartTime = DateTime.UtcNow; |
|
|
var processingStartTime = DateTime.UtcNow; |
|
|
|
|
|
|
|
|
var success = await _errorHandler.HandleWithRetryAsync(async () => |
|
|
try |
|
|
{ |
|
|
{ |
|
|
// 为每个消息创建独立的消息缓冲区,避免并发问题
|
|
|
var success = await _errorHandler.HandleWithRetryAsync(async () => |
|
|
using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); |
|
|
|
|
|
|
|
|
|
|
|
if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) |
|
|
|
|
|
{ |
|
|
{ |
|
|
throw new WebSocketException("消息缓冲区溢出"); |
|
|
// 为每个消息创建独立的消息缓冲区,避免并发问题
|
|
|
} |
|
|
using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); |
|
|
|
|
|
|
|
|
|
|
|
// 验证输入参数
|
|
|
|
|
|
if (buffer == null || buffer.Length == 0) |
|
|
|
|
|
{ |
|
|
|
|
|
throw new WebSocketException("接收缓冲区为空或无效"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (receiveResult.Count <= 0 || receiveResult.Count > buffer.Length) |
|
|
|
|
|
{ |
|
|
|
|
|
throw new WebSocketException($"无效的消息大小:{receiveResult.Count},缓冲区大小:{buffer.Length}"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (receiveResult.Count > _options.MaxMessageSize) |
|
|
|
|
|
{ |
|
|
|
|
|
throw new WebSocketException($"消息大小超过限制:{receiveResult.Count} > {_options.MaxMessageSize}"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) |
|
|
|
|
|
{ |
|
|
|
|
|
throw new WebSocketException("消息缓冲区溢出"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if (receiveResult.EndOfMessage) |
|
|
if (receiveResult.EndOfMessage) |
|
|
{ |
|
|
|
|
|
var message = new WebSocketMessage |
|
|
|
|
|
{ |
|
|
{ |
|
|
ConnectionId = connectionId, |
|
|
var message = new WebSocketMessage |
|
|
Data = messageBuffer.GetMessage(), |
|
|
{ |
|
|
MessageType = receiveResult.MessageType, |
|
|
ConnectionId = connectionId, |
|
|
IsComplete = true |
|
|
Data = messageBuffer.GetMessage(), |
|
|
}; |
|
|
MessageType = receiveResult.MessageType, |
|
|
|
|
|
IsComplete = true |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
await messageChannel.Writer.WriteAsync(message, cancellationToken); |
|
|
await messageChannel.Writer.WriteAsync(message, cancellationToken); |
|
|
|
|
|
|
|
|
var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds; |
|
|
var processingTime = (DateTime.UtcNow - processingStartTime).TotalMilliseconds; |
|
|
_performanceMonitor.RecordMessage( |
|
|
_performanceMonitor.RecordMessage( |
|
|
connectionId, |
|
|
connectionId, |
|
|
message.Data.Length, |
|
|
message.Data.Length, |
|
|
(long)processingTime); |
|
|
(long)processingTime); |
|
|
|
|
|
|
|
|
// 记录 Channel 消息处理统计
|
|
|
// 记录 Channel 消息处理统计
|
|
|
_channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); |
|
|
_channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); |
|
|
|
|
|
|
|
|
// messageBuffer 会在 using 语句中自动释放
|
|
|
// messageBuffer 会在 using 语句中自动释放
|
|
|
} |
|
|
} |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
|
if (!success) |
|
|
if (!success) |
|
|
|
|
|
{ |
|
|
|
|
|
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
catch (WebSocketException ex) |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogError(ex, "WebSocket 消息处理异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", |
|
|
|
|
|
connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); |
|
|
|
|
|
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); |
|
|
|
|
|
} |
|
|
|
|
|
catch (OperationCanceledException ex) |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogWarning(ex, "消息处理被取消,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节", |
|
|
|
|
|
connectionId, receiveResult.MessageType, receiveResult.Count); |
|
|
|
|
|
// 取消操作转换为 WebSocket 异常
|
|
|
|
|
|
throw new WebSocketException("消息处理操作被取消"); |
|
|
|
|
|
} |
|
|
|
|
|
catch (Exception ex) |
|
|
{ |
|
|
{ |
|
|
|
|
|
_logger.LogError(ex, "消息处理发生未知异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", |
|
|
|
|
|
connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); |
|
|
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); |
|
|
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|