|
@ -73,7 +73,6 @@ public class WebSocketMiddleware |
|
|
private readonly IWebSocketMessageQueueManager _messageQueueManager; |
|
|
private readonly IWebSocketMessageQueueManager _messageQueueManager; |
|
|
private readonly ILogger<WebSocketMiddleware> _logger; |
|
|
private readonly ILogger<WebSocketMiddleware> _logger; |
|
|
private readonly WebSocketOptions _options; |
|
|
private readonly WebSocketOptions _options; |
|
|
private readonly WebSocketMessageBuffer _messageBuffer; |
|
|
|
|
|
private readonly WebSocketErrorHandler _errorHandler; |
|
|
private readonly WebSocketErrorHandler _errorHandler; |
|
|
private readonly WebSocketPerformanceMonitor _performanceMonitor; |
|
|
private readonly WebSocketPerformanceMonitor _performanceMonitor; |
|
|
// 使用 WebSocket Channel 管理器来管理 Channel 的生命周期
|
|
|
// 使用 WebSocket Channel 管理器来管理 Channel 的生命周期
|
|
@ -100,7 +99,6 @@ public class WebSocketMiddleware |
|
|
_messageQueueManager = messageQueueManager; |
|
|
_messageQueueManager = messageQueueManager; |
|
|
_logger = logger; |
|
|
_logger = logger; |
|
|
_options = options.Value; |
|
|
_options = options.Value; |
|
|
_messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); |
|
|
|
|
|
_errorHandler = new WebSocketErrorHandler(logger, _options.MessageRetryCount, _options.MessageRetryInterval); |
|
|
_errorHandler = new WebSocketErrorHandler(logger, _options.MessageRetryCount, _options.MessageRetryInterval); |
|
|
_performanceMonitor = new WebSocketPerformanceMonitor(logger); |
|
|
_performanceMonitor = new WebSocketPerformanceMonitor(logger); |
|
|
|
|
|
|
|
@ -169,8 +167,8 @@ public class WebSocketMiddleware |
|
|
|
|
|
|
|
|
// 使用原子操作的状态标志,确保线程安全
|
|
|
// 使用原子操作的状态标志,确保线程安全
|
|
|
var processingState = new AtomicBoolean(false); |
|
|
var processingState = new AtomicBoolean(false); |
|
|
// 从内存池租借缓冲区,用于接收消息
|
|
|
// 从内存池租借缓冲区,用于接收消息,大小与最大消息大小一致
|
|
|
var buffer = ArrayPool<byte>.Shared.Rent(1024 * 4); |
|
|
var buffer = ArrayPool<byte>.Shared.Rent(_options.MaxMessageSize); |
|
|
// 使用 Channel 管理器创建新的消息通道
|
|
|
// 使用 Channel 管理器创建新的消息通道
|
|
|
var messageChannel = await _channelManager.CreateNewMessageChannel(connectionId); |
|
|
var messageChannel = await _channelManager.CreateNewMessageChannel(connectionId); |
|
|
|
|
|
|
|
@ -308,8 +306,10 @@ public class WebSocketMiddleware |
|
|
{ |
|
|
{ |
|
|
_logger.LogInformation("开始处理 WebSocket 消息,连接ID:{ConnectionId}", connectionId); |
|
|
_logger.LogInformation("开始处理 WebSocket 消息,连接ID:{ConnectionId}", connectionId); |
|
|
|
|
|
|
|
|
// 为每个连接创建独立的消息缓冲区
|
|
|
// 为每个连接创建独立的消息缓冲区,使用更大的缓冲区大小来容纳分片消息
|
|
|
using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); |
|
|
// 缓冲区大小设置为 MaxMessageSize 的 1.5 倍,确保有足够空间处理分片
|
|
|
|
|
|
var bufferSize = (int)(_options.MaxMessageSize * 1.5); |
|
|
|
|
|
using var messageBuffer = new WebSocketMessageBuffer(bufferSize); |
|
|
var lastMessageTime = DateTime.UtcNow; |
|
|
var lastMessageTime = DateTime.UtcNow; |
|
|
|
|
|
|
|
|
try |
|
|
try |
|
@ -387,7 +387,7 @@ public class WebSocketMiddleware |
|
|
{ |
|
|
{ |
|
|
_logger.LogError(ex, "WebSocket 消息处理发生未知异常,连接ID:{ConnectionId},错误:{Error}", |
|
|
_logger.LogError(ex, "WebSocket 消息处理发生未知异常,连接ID:{ConnectionId},错误:{Error}", |
|
|
connectionId, ex.Message); |
|
|
connectionId, ex.Message); |
|
|
throw; |
|
|
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -419,25 +419,38 @@ public class WebSocketMiddleware |
|
|
{ |
|
|
{ |
|
|
var processingStartTime = DateTime.UtcNow; |
|
|
var processingStartTime = DateTime.UtcNow; |
|
|
|
|
|
|
|
|
var success = await _errorHandler.HandleWithRetryAsync(async () => |
|
|
try |
|
|
{ |
|
|
{ |
|
|
// 检查缓冲区是否已满
|
|
|
// 检查当前缓冲区大小加上新数据是否会超过限制
|
|
|
|
|
|
var totalSize = messageBuffer.Size + receiveResult.Count; |
|
|
|
|
|
if (totalSize > _options.MaxMessageSize) |
|
|
|
|
|
{ |
|
|
|
|
|
_logger.LogWarning("消息大小将超过限制,连接ID:{ConnectionId},当前缓冲区大小:{CurrentSize},新数据大小:{NewDataSize},总大小:{TotalSize},限制:{MaxSize}", |
|
|
|
|
|
connectionId, messageBuffer.Size, receiveResult.Count, totalSize, _options.MaxMessageSize); |
|
|
|
|
|
throw new WebSocketException("消息大小超过限制"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 写入数据到缓冲区(由于已经检查了大小,这里应该总是成功)
|
|
|
if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) |
|
|
if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) |
|
|
{ |
|
|
{ |
|
|
_logger.LogWarning("消息缓冲区已满,连接ID:{ConnectionId},缓冲区大小:{BufferSize},消息大小:{MessageSize}", |
|
|
_logger.LogError("消息缓冲区写入失败,连接ID:{ConnectionId},缓冲区大小:{BufferSize},消息大小:{MessageSize},这不应该发生", |
|
|
connectionId, messageBuffer.Size, receiveResult.Count); |
|
|
connectionId, messageBuffer.Size, receiveResult.Count); |
|
|
throw new WebSocketException("消息缓冲区溢出"); |
|
|
throw new WebSocketException("消息缓冲区写入失败"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_logger.LogDebug("数据已写入缓冲区,连接ID:{ConnectionId},当前缓冲区大小:{BufferSize},消息是否完整:{IsComplete}", |
|
|
|
|
|
connectionId, messageBuffer.Size, receiveResult.EndOfMessage); |
|
|
|
|
|
|
|
|
// 检查消息是否完整
|
|
|
// 检查消息是否完整
|
|
|
if (receiveResult.EndOfMessage) |
|
|
if (receiveResult.EndOfMessage) |
|
|
{ |
|
|
{ |
|
|
var messageData = messageBuffer.GetMessage(); |
|
|
var messageData = messageBuffer.GetMessage(); |
|
|
|
|
|
|
|
|
// 检查消息大小是否超过限制
|
|
|
// 由于已经在上面的 totalSize 检查中验证了大小,这里不需要再次检查
|
|
|
|
|
|
// 但为了安全起见,保留这个检查作为最后的验证
|
|
|
if (messageData.Length > _options.MaxMessageSize) |
|
|
if (messageData.Length > _options.MaxMessageSize) |
|
|
{ |
|
|
{ |
|
|
_logger.LogWarning("消息大小超过限制,连接ID:{ConnectionId},消息大小:{MessageSize},限制:{MaxSize}", |
|
|
_logger.LogError("最终消息大小超过限制,连接ID:{ConnectionId},消息大小:{MessageSize},限制:{MaxSize},这不应该发生", |
|
|
connectionId, messageData.Length, _options.MaxMessageSize); |
|
|
connectionId, messageData.Length, _options.MaxMessageSize); |
|
|
throw new WebSocketException("消息大小超过限制"); |
|
|
throw new WebSocketException("消息大小超过限制"); |
|
|
} |
|
|
} |
|
@ -472,14 +485,39 @@ public class WebSocketMiddleware |
|
|
throw new InvalidOperationException("消息通道已关闭"); |
|
|
throw new InvalidOperationException("消息通道已关闭"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 重置缓冲区
|
|
|
// 重置缓冲区,准备接收下一条消息
|
|
|
messageBuffer.Reset(); |
|
|
messageBuffer.Reset(); |
|
|
|
|
|
_logger.LogDebug("缓冲区已重置,连接ID:{ConnectionId}", connectionId); |
|
|
} |
|
|
} |
|
|
}); |
|
|
else |
|
|
|
|
|
{ |
|
|
if (!success) |
|
|
_logger.LogDebug("消息分片已累积,连接ID:{ConnectionId},当前缓冲区大小:{BufferSize}字节", |
|
|
|
|
|
connectionId, messageBuffer.Size); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
catch (WebSocketException ex) |
|
|
{ |
|
|
{ |
|
|
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); |
|
|
// 对于 WebSocket 异常,直接抛出,不进行重试
|
|
|
|
|
|
_logger.LogError(ex, "WebSocket 消息处理异常,连接ID:{ConnectionId},错误:{Error}", |
|
|
|
|
|
connectionId, ex.Message); |
|
|
|
|
|
throw; |
|
|
|
|
|
} |
|
|
|
|
|
catch (Exception ex) |
|
|
|
|
|
{ |
|
|
|
|
|
// 对于其他异常,使用重试机制
|
|
|
|
|
|
_logger.LogError(ex, "消息处理发生未知异常,连接ID:{ConnectionId},错误:{Error}", |
|
|
|
|
|
connectionId, ex.Message); |
|
|
|
|
|
|
|
|
|
|
|
var success = await _errorHandler.HandleWithRetryAsync(async () => |
|
|
|
|
|
{ |
|
|
|
|
|
// 这里可以添加重试逻辑,但对于缓冲区问题,重试通常无效
|
|
|
|
|
|
throw ex; // 重新抛出异常
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
if (!success) |
|
|
|
|
|
{ |
|
|
|
|
|
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|