diff --git a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs index f180d91..3fcdf74 100644 --- a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs +++ b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs @@ -173,15 +173,18 @@ public class WebSocketMiddleware var buffer = ArrayPool.Shared.Rent(_options.MaxMessageSize); // 使用 Channel 管理器创建新的消息通道 var messageChannel = await _channelManager.CreateNewMessageChannel(connectionId); + // 创建取消令牌源,不设置超时,依赖手动超时检查 + using var cts = new CancellationTokenSource(); + + // ✅ 修复:使用Task.CompletedTask作为默认值,确保总是有任务可以等待 + var processTask = Task.CompletedTask; try { - // 创建取消令牌源,设置连接超时 - using var cts = new CancellationTokenSource(_options.ConnectionTimeout); var messageStartTime = DateTime.UtcNow; // 启动消息处理循环,异步处理接收到的消息 - var processTask = StartMessageProcessingLoop(messageChannel, processingState, cts.Token); + processTask = StartMessageProcessingLoop(messageChannel, processingState, cts.Token); // 处理消息接收,包括消息的接收、解析和处理 await ProcessWebSocketMessages(webSocket, connectionId, buffer, messageChannel, @@ -190,8 +193,30 @@ public class WebSocketMiddleware // 完成消息处理,等待所有消息处理完成 await CompleteMessageProcessing(messageChannel, processingState, cts.Token); } + catch (OperationCanceledException ex) + { + _logger.LogInformation("WebSocket连接处理已取消,连接ID:{ConnectionId},原因:{Reason}", + connectionId,ex.ToString()); + } + catch (Exception ex) + { + _logger.LogError(ex, "处理WebSocket连接时发生异常,连接ID:{ConnectionId}", connectionId); + } finally { + // ✅ 取消所有相关操作,确保资源正确释放 + cts.Cancel(); + + // ✅ 修复:总是等待任务完成,不需要检查null + try + { + await processTask; + } + catch (Exception ex) + { + _logger.LogError(ex, "等待消息处理循环完成时发生异常,连接ID:{ConnectionId}", connectionId); + } + // 释放资源 ArrayPool.Shared.Return(buffer); // 使用 Channel 管理器释放 Channel @@ -233,7 +258,8 @@ public class WebSocketMiddleware message.ConnectionId, message.MessageType); // 使用原子操作设置处理状态,确保同一时间只有一个消息在处理 - if (!processingState.Set(true)) + // processingState.Set(true) 返回之前的值,如果返回false表示之前是false,现在设置为true + if (processingState.Set(true) == false) { try { @@ -243,7 +269,13 @@ public class WebSocketMiddleware _logger.LogWarning("消息队列已满,消息将重新入队,连接ID:{ConnectionId}", message.ConnectionId); // 如果队列已满,将消息重新写入通道并等待一段时间后重试 - await messageChannel.Writer.WriteAsync(message, cancellationToken); + // 检查通道是否已关闭,避免向已关闭的通道写入 + if (!messageChannel.Writer.TryWrite(message)) + { + _logger.LogError("无法重新写入消息到通道,通道可能已关闭,连接ID:{ConnectionId}", + message.ConnectionId); + break; + } await Task.Delay(100, cancellationToken); } else @@ -272,8 +304,7 @@ public class WebSocketMiddleware catch (Exception ex) { // 记录处理循环中的异常 - _logger.LogError(ex, "消息处理循环发生错误,连接ID:{ConnectionId}", - messageChannel.Reader.Completion.IsCompleted); + _logger.LogError(ex, "消息处理循环发生错误"); } finally { @@ -646,8 +677,16 @@ public class WebSocketMiddleware _value = initialValue ? 1 : 0; } + /// + /// 获取当前值 + /// public bool Value => Interlocked.CompareExchange(ref _value, 1, 1) == 1; + /// + /// 设置新值并返回之前的值 + /// + /// 要设置的新值 + /// 设置前的值 public bool Set(bool value) { return Interlocked.Exchange(ref _value, value ? 1 : 0) == 1;