Browse Source

WebSocketMiddleware cts 取消问题 修复

feature/x1-web-request
root 1 week ago
parent
commit
7b01be01d9
  1. 53
      src/X1.WebSocket/Middleware/WebSocketMiddleware.cs

53
src/X1.WebSocket/Middleware/WebSocketMiddleware.cs

@ -173,15 +173,18 @@ public class WebSocketMiddleware
var buffer = ArrayPool<byte>.Shared.Rent(_options.MaxMessageSize); var buffer = ArrayPool<byte>.Shared.Rent(_options.MaxMessageSize);
// 使用 Channel 管理器创建新的消息通道 // 使用 Channel 管理器创建新的消息通道
var messageChannel = await _channelManager.CreateNewMessageChannel(connectionId); var messageChannel = await _channelManager.CreateNewMessageChannel(connectionId);
// 创建取消令牌源,不设置超时,依赖手动超时检查
using var cts = new CancellationTokenSource();
// ✅ 修复:使用Task.CompletedTask作为默认值,确保总是有任务可以等待
var processTask = Task.CompletedTask;
try try
{ {
// 创建取消令牌源,设置连接超时
using var cts = new CancellationTokenSource(_options.ConnectionTimeout);
var messageStartTime = DateTime.UtcNow; var messageStartTime = DateTime.UtcNow;
// 启动消息处理循环,异步处理接收到的消息 // 启动消息处理循环,异步处理接收到的消息
var processTask = StartMessageProcessingLoop(messageChannel, processingState, cts.Token); processTask = StartMessageProcessingLoop(messageChannel, processingState, cts.Token);
// 处理消息接收,包括消息的接收、解析和处理 // 处理消息接收,包括消息的接收、解析和处理
await ProcessWebSocketMessages(webSocket, connectionId, buffer, messageChannel, await ProcessWebSocketMessages(webSocket, connectionId, buffer, messageChannel,
@ -190,8 +193,30 @@ public class WebSocketMiddleware
// 完成消息处理,等待所有消息处理完成 // 完成消息处理,等待所有消息处理完成
await CompleteMessageProcessing(messageChannel, processingState, cts.Token); await CompleteMessageProcessing(messageChannel, processingState, cts.Token);
} }
catch (OperationCanceledException ex)
{
_logger.LogInformation("WebSocket连接处理已取消,连接ID:{ConnectionId},原因:{Reason}",
connectionId,ex.ToString());
}
catch (Exception ex)
{
_logger.LogError(ex, "处理WebSocket连接时发生异常,连接ID:{ConnectionId}", connectionId);
}
finally finally
{ {
// ✅ 取消所有相关操作,确保资源正确释放
cts.Cancel();
// ✅ 修复:总是等待任务完成,不需要检查null
try
{
await processTask;
}
catch (Exception ex)
{
_logger.LogError(ex, "等待消息处理循环完成时发生异常,连接ID:{ConnectionId}", connectionId);
}
// 释放资源 // 释放资源
ArrayPool<byte>.Shared.Return(buffer); ArrayPool<byte>.Shared.Return(buffer);
// 使用 Channel 管理器释放 Channel // 使用 Channel 管理器释放 Channel
@ -233,7 +258,8 @@ public class WebSocketMiddleware
message.ConnectionId, message.MessageType); message.ConnectionId, message.MessageType);
// 使用原子操作设置处理状态,确保同一时间只有一个消息在处理 // 使用原子操作设置处理状态,确保同一时间只有一个消息在处理
if (!processingState.Set(true)) // processingState.Set(true) 返回之前的值,如果返回false表示之前是false,现在设置为true
if (processingState.Set(true) == false)
{ {
try try
{ {
@ -243,7 +269,13 @@ public class WebSocketMiddleware
_logger.LogWarning("消息队列已满,消息将重新入队,连接ID:{ConnectionId}", _logger.LogWarning("消息队列已满,消息将重新入队,连接ID:{ConnectionId}",
message.ConnectionId); message.ConnectionId);
// 如果队列已满,将消息重新写入通道并等待一段时间后重试 // 如果队列已满,将消息重新写入通道并等待一段时间后重试
await messageChannel.Writer.WriteAsync(message, cancellationToken); // 检查通道是否已关闭,避免向已关闭的通道写入
if (!messageChannel.Writer.TryWrite(message))
{
_logger.LogError("无法重新写入消息到通道,通道可能已关闭,连接ID:{ConnectionId}",
message.ConnectionId);
break;
}
await Task.Delay(100, cancellationToken); await Task.Delay(100, cancellationToken);
} }
else else
@ -272,8 +304,7 @@ public class WebSocketMiddleware
catch (Exception ex) catch (Exception ex)
{ {
// 记录处理循环中的异常 // 记录处理循环中的异常
_logger.LogError(ex, "消息处理循环发生错误,连接ID:{ConnectionId}", _logger.LogError(ex, "消息处理循环发生错误");
messageChannel.Reader.Completion.IsCompleted);
} }
finally finally
{ {
@ -646,8 +677,16 @@ public class WebSocketMiddleware
_value = initialValue ? 1 : 0; _value = initialValue ? 1 : 0;
} }
/// <summary>
/// 获取当前值
/// </summary>
public bool Value => Interlocked.CompareExchange(ref _value, 1, 1) == 1; public bool Value => Interlocked.CompareExchange(ref _value, 1, 1) == 1;
/// <summary>
/// 设置新值并返回之前的值
/// </summary>
/// <param name="value">要设置的新值</param>
/// <returns>设置前的值</returns>
public bool Set(bool value) public bool Set(bool value)
{ {
return Interlocked.Exchange(ref _value, value ? 1 : 0) == 1; return Interlocked.Exchange(ref _value, value ? 1 : 0) == 1;

Loading…
Cancel
Save