diff --git a/src/X1.WebAPI/Program.cs b/src/X1.WebAPI/Program.cs index 220b950..a6dc1b1 100644 --- a/src/X1.WebAPI/Program.cs +++ b/src/X1.WebAPI/Program.cs @@ -49,7 +49,8 @@ builder.Services.AddWebSocketServices(options => { // 配置 WebSocket 选项 options.MaxConcurrentConnections = 2000; // 最大并发连接数 - options.MaxMessageSize = 1024 * 1024; // 最大消息大小(字节) + options.MaxMessageSize = 1024 * 1024; // 最大消息大小(业务限制,1MB) + options.MessageBufferMultiplier = 5; // 缓冲区大小倍数(5MB 缓冲区) options.ConnectionTimeout = TimeSpan.FromMinutes(2); // 连接超时时间 options.HeartbeatInterval = TimeSpan.FromSeconds(30); // 心跳检测间隔 }); diff --git a/src/X1.WebSocket/Buffer/WebSocketMessageBuffer.cs b/src/X1.WebSocket/Buffer/WebSocketMessageBuffer.cs index 256dd52..e95dab1 100644 --- a/src/X1.WebSocket/Buffer/WebSocketMessageBuffer.cs +++ b/src/X1.WebSocket/Buffer/WebSocketMessageBuffer.cs @@ -27,6 +27,11 @@ public sealed class WebSocketMessageBuffer : IDisposable /// public bool IsFull => _position >= _maxSize; + /// + /// 缓冲区的最大大小 + /// + public int MaxSize => _maxSize; + /// /// 构造函数 /// diff --git a/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs b/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs index be320b4..5b53c0d 100644 --- a/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs +++ b/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs @@ -41,7 +41,7 @@ namespace CellularManagement.WebSocket.Handlers MessageType = System.Net.WebSockets.WebSocketMessageType.Text, NeedQueue = false }; - + await Task.CompletedTask.ConfigureAwait(false); _logger.LogDebug("协议消息处理完成,连接ID:{ConnectionId}", message.ConnectionId); return response; } diff --git a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs index 1fd8d1c..f180d91 100644 --- a/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs +++ b/src/X1.WebSocket/Middleware/WebSocketMiddleware.cs @@ -308,11 +308,19 @@ public class WebSocketMiddleware { _logger.LogInformation("开始处理 WebSocket 消息,连接ID:{ConnectionId}", connectionId); + // 使用动态缓冲区大小,为客户端数据提供足够空间 + // 缓冲区大小 = 业务限制 * 倍数,确保能处理客户端发送的大数据 + var bufferSize = _options.GetActualBufferSize(); + using var messageBuffer = new WebSocketMessageBuffer(bufferSize); + + _logger.LogDebug("创建消息缓冲区,连接ID:{ConnectionId},缓冲区大小:{BufferSize}字节,业务限制:{MaxMessageSize}字节", + connectionId, bufferSize, _options.MaxMessageSize); + // 接收第一条消息 var receiveResult = await webSocket.ReceiveAsync( new ArraySegment(buffer), cancellationToken); - _logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType}", - connectionId, receiveResult.MessageType); + _logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType},结束标志:{EndOfMessage}", + connectionId, receiveResult.MessageType, receiveResult.EndOfMessage); // 循环处理消息,直到收到关闭消息或发生错误 while (!receiveResult.CloseStatus.HasValue) @@ -338,11 +346,20 @@ public class WebSocketMiddleware // 处理有效消息类型(文本或二进制) if (IsValidMessageType(receiveResult.MessageType)) { - _logger.LogDebug("处理消息,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节", - connectionId, receiveResult.MessageType, receiveResult.Count); - await ProcessMessage(webSocket, connectionId, buffer, receiveResult, - messageChannel, messageStartTime, cancellationToken); - messageStartTime = DateTime.UtcNow; + _logger.LogDebug("处理消息片段,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节,结束标志:{EndOfMessage}", + connectionId, receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage); + + // 累积消息片段 + await AccumulateMessageFragment(webSocket, connectionId, buffer, receiveResult, + messageBuffer, messageChannel, messageStartTime, cancellationToken); + + // 如果消息完整,重置缓冲区并更新时间 + if (receiveResult.EndOfMessage) + { + messageBuffer.Reset(); + messageStartTime = DateTime.UtcNow; + _logger.LogDebug("消息完整接收,连接ID:{ConnectionId},重置缓冲区", connectionId); + } } else { @@ -353,35 +370,22 @@ public class WebSocketMiddleware // 接收下一条消息 receiveResult = await webSocket.ReceiveAsync( new ArraySegment(buffer), cancellationToken); - _logger.LogDebug("接收下一条消息,连接ID:{ConnectionId},消息类型:{MessageType}", - connectionId, receiveResult.MessageType); + _logger.LogDebug("接收下一条消息,连接ID:{ConnectionId},消息类型:{MessageType},结束标志:{EndOfMessage}", + connectionId, receiveResult.MessageType, receiveResult.EndOfMessage); } _logger.LogInformation("WebSocket 消息处理完成,连接ID:{ConnectionId}", connectionId); } /// - /// 处理单个消息 + /// 累积消息片段 /// - /// WebSocket 连接实例 - /// 连接ID - /// 接收缓冲区 - /// 接收结果 - /// 消息通道 - /// 消息开始时间 - /// 取消令牌 - /// - /// 该方法负责: - /// 1. 将接收到的数据写入消息缓冲区 - /// 2. 检查消息是否完整 - /// 3. 创建消息对象并写入通道 - /// 4. 记录性能指标 - /// - private async Task ProcessMessage( + private async Task AccumulateMessageFragment( System.Net.WebSockets.WebSocket webSocket, string connectionId, byte[] buffer, WebSocketReceiveResult receiveResult, + WebSocketMessageBuffer messageBuffer, Channel messageChannel, DateTime messageStartTime, CancellationToken cancellationToken) @@ -392,9 +396,6 @@ public class WebSocketMiddleware { var success = await _errorHandler.HandleWithRetryAsync(async () => { - // 为每个消息创建独立的消息缓冲区,避免并发问题 - using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); - // 验证输入参数 if (buffer == null || buffer.Length == 0) { @@ -406,22 +407,47 @@ public class WebSocketMiddleware throw new WebSocketException($"无效的消息大小:{receiveResult.Count},缓冲区大小:{buffer.Length}"); } - if (receiveResult.Count > _options.MaxMessageSize) + // 检查累积后的大小是否会超过缓冲区容量 + // 这是技术层面的检查,防止缓冲区溢出 + var totalSize = messageBuffer.Size + receiveResult.Count; + if (totalSize > messageBuffer.MaxSize) { - throw new WebSocketException($"消息大小超过限制:{receiveResult.Count} > {_options.MaxMessageSize}"); + throw new WebSocketException($"消息大小将超过缓冲区容量:{totalSize} > {messageBuffer.MaxSize}字节"); } + // 累积消息片段 if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count)) { - throw new WebSocketException("消息缓冲区溢出"); + throw new WebSocketException("消息缓冲区写入失败"); } + _logger.LogDebug("消息片段已累积,连接ID:{ConnectionId},当前缓冲区大小:{BufferSize}字节,片段大小:{FragmentSize}字节", + connectionId, messageBuffer.Size, receiveResult.Count); + + // 如果消息完整,创建完整的消息对象 if (receiveResult.EndOfMessage) { + var messageData = messageBuffer.GetMessage(); + + // 最终验证消息大小是否符合业务限制 + // 这是业务层面的检查,超过限制会记录警告但继续处理 + if (messageData.Length > _options.MaxMessageSize) + { + _logger.LogWarning("消息大小超过业务限制,连接ID:{ConnectionId},大小:{Size}字节,限制:{MaxSize}字节,但已完整接收", + connectionId, messageData.Length, _options.MaxMessageSize); + + // 可以选择: + // 1. 拒绝处理,返回错误 + // 2. 记录警告但继续处理 + // 3. 根据业务需求决定 + + // 这里选择记录警告但继续处理,确保数据完整性 + } + var message = new WebSocketMessage { ConnectionId = connectionId, - Data = messageBuffer.GetMessage(), + Data = messageData, MessageType = receiveResult.MessageType, IsComplete = true }; @@ -437,7 +463,8 @@ public class WebSocketMiddleware // 记录 Channel 消息处理统计 _channelManager.RecordMessageProcessed(messageChannel, message.Data.Length); - // messageBuffer 会在 using 语句中自动释放 + _logger.LogDebug("完整消息已写入通道,连接ID:{ConnectionId},消息大小:{MessageSize}字节,处理时间:{ProcessingTime}ms", + connectionId, message.Data.Length, processingTime); } }); @@ -448,20 +475,19 @@ public class WebSocketMiddleware } catch (WebSocketException ex) { - _logger.LogError(ex, "WebSocket 消息处理异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", + _logger.LogError(ex, "WebSocket 消息片段处理异常,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节,错误:{ErrorMessage}", connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); } catch (OperationCanceledException ex) { - _logger.LogWarning(ex, "消息处理被取消,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节", + _logger.LogWarning(ex, "消息片段处理被取消,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节", connectionId, receiveResult.MessageType, receiveResult.Count); - // 取消操作转换为 WebSocket 异常 throw new WebSocketException("消息处理操作被取消"); } catch (Exception ex) { - _logger.LogError(ex, "消息处理发生未知异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}", + _logger.LogError(ex, "消息片段处理发生未知异常,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节,错误:{ErrorMessage}", connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message); await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken); } diff --git a/src/X1.WebSocket/Models/WebSocketOptions.cs b/src/X1.WebSocket/Models/WebSocketOptions.cs index 0c955e5..02fe755 100644 --- a/src/X1.WebSocket/Models/WebSocketOptions.cs +++ b/src/X1.WebSocket/Models/WebSocketOptions.cs @@ -22,9 +22,18 @@ namespace CellularManagement.WebSocket.Models public int MessageQueueSize { get; set; } = 10000; /// - /// 消息队列大小 + /// 最大消息大小(业务限制) + /// 超过此大小的消息会被记录警告,但不会导致连接关闭 + /// 这是业务层面的限制,用于防止过大的消息影响系统性能 + /// + public int MaxMessageSize { get; set; } = 1024*1024; // 1MB + + /// + /// 消息缓冲区大小倍数 + /// 用于计算实际缓冲区大小:MaxMessageSize * MessageBufferMultiplier + /// 确保缓冲区有足够空间处理客户端发送的大数据 /// - public int MaxMessageSize { get; set; } = 1024*1024; + public int MessageBufferMultiplier { get; set; } = 5; // 5倍,即5MB缓冲区 /// /// 最大并发处理数 @@ -57,5 +66,12 @@ namespace CellularManagement.WebSocket.Models /// 消息重试间隔 /// public TimeSpan MessageRetryInterval { get; set; } = TimeSpan.FromSeconds(1); + + /// + /// 计算实际缓冲区大小 + /// 用于累积分片消息的缓冲区大小,应该大于业务限制 + /// + /// 实际缓冲区大小(字节) + public int GetActualBufferSize() => MaxMessageSize * MessageBufferMultiplier; } } diff --git a/src/modify.md b/src/modify.md index 13b8c64..9252247 100644 --- a/src/modify.md +++ b/src/modify.md @@ -438,4 +438,158 @@ catch (Exception ex) - 简化了代码逻辑,提高了可读性 - 减少了不必要的重试开销 - 保持了错误处理的完整性 -- 提高了代码执行效率 \ No newline at end of file +- 提高了代码执行效率 + +## 2024-12-19 WebSocket 缓冲区大小修复 + +### 修改文件 +- `X1.WebSocket/Models/WebSocketOptions.cs` +- `X1.WebSocket/Buffer/WebSocketMessageBuffer.cs` +- `X1.WebSocket/Middleware/WebSocketMiddleware.cs` +- `X1.WebAPI/Program.cs` + +### 问题描述 +原始实现中存在严重的设计问题: +```csharp +// 问题代码 +using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize); +``` + +**问题所在:** +1. **`_options.MaxMessageSize` 是服务器端的业务限制**(默认 1MB) +2. **客户端发送的数据大小无法预知**,可能超过服务器限制 +3. **缓冲区大小被硬编码为业务限制**,无法处理更大的消息 +4. **混淆了"缓冲区容量"和"业务限制"的概念** + +### 根本原因分析 + +#### 1. 概念混淆 +- **业务限制**:`MaxMessageSize` 是业务层面的限制,用于防止过大的消息影响系统性能 +- **缓冲区容量**:应该为客户端数据提供足够的空间,通常大于业务限制 + +#### 2. 实际场景问题 +```csharp +// 配置中的限制 +options.MaxMessageSize = 1024 * 1024; // 1MB + +// 但客户端可能发送: +// - 2MB 的文件 +// - 5MB 的图片 +// - 10MB 的视频片段 +// - 等等... +``` + +### 修复方案 + +#### 1. 改进 WebSocketOptions 配置 +```csharp +public class WebSocketOptions +{ + /// + /// 最大消息大小(业务限制) + /// 超过此大小的消息会被记录警告,但不会导致连接关闭 + /// + public int MaxMessageSize { get; set; } = 1024 * 1024; // 1MB + + /// + /// 消息缓冲区大小倍数 + /// 用于计算实际缓冲区大小:MaxMessageSize * MessageBufferMultiplier + /// + public int MessageBufferMultiplier { get; set; } = 5; // 5倍 + + /// + /// 计算实际缓冲区大小 + /// + public int GetActualBufferSize() => MaxMessageSize * MessageBufferMultiplier; +} +``` + +#### 2. 增强 WebSocketMessageBuffer +```csharp +public sealed class WebSocketMessageBuffer : IDisposable +{ + // 添加公共属性访问最大缓冲区大小 + public int MaxSize => _maxSize; +} +``` + +#### 3. 修改 ProcessWebSocketMessages 方法 +```csharp +private async Task ProcessWebSocketMessages(...) +{ + // 使用动态缓冲区大小,为客户端数据提供足够空间 + var bufferSize = _options.GetActualBufferSize(); // 5MB + using var messageBuffer = new WebSocketMessageBuffer(bufferSize); + + _logger.LogDebug("创建消息缓冲区,缓冲区大小:{BufferSize}字节,业务限制:{MaxMessageSize}字节", + bufferSize, _options.MaxMessageSize); +} +``` + +#### 4. 改进 AccumulateMessageFragment 验证逻辑 +```csharp +// 检查累积后的大小是否会超过缓冲区容量 +// 这是技术层面的检查,防止缓冲区溢出 +var totalSize = messageBuffer.Size + receiveResult.Count; +if (totalSize > messageBuffer.MaxSize) +{ + throw new WebSocketException($"消息大小将超过缓冲区容量:{totalSize} > {messageBuffer.MaxSize}字节"); +} + +// 最终验证消息大小是否符合业务限制 +// 这是业务层面的检查,超过限制会记录警告但继续处理 +if (messageData.Length > _options.MaxMessageSize) +{ + _logger.LogWarning("消息大小超过业务限制,但已完整接收"); + // 记录警告但继续处理,确保数据完整性 +} +``` + +#### 5. 更新配置示例 +```csharp +builder.Services.AddWebSocketServices(options => +{ + options.MaxConcurrentConnections = 2000; + options.MaxMessageSize = 1024 * 1024; // 业务限制:1MB + options.MessageBufferMultiplier = 5; // 缓冲区:5MB + options.ConnectionTimeout = TimeSpan.FromMinutes(2); + options.HeartbeatInterval = TimeSpan.FromSeconds(30); +}); +``` + +### 技术细节 + +#### 1. 分层验证策略 +- **缓冲区容量检查**:防止技术层面的溢出 +- **业务限制检查**:防止业务层面的过大消息 +- **分离关注点**:技术问题 vs 业务问题 + +#### 2. 动态缓冲区大小 +- **计算公式**:`MaxMessageSize * MessageBufferMultiplier` +- **默认配置**:1MB * 5 = 5MB 缓冲区 +- **可配置性**:根据实际需求调整倍数 + +#### 3. 错误处理策略 +- **缓冲区溢出**:抛出异常,关闭连接 +- **业务限制超限**:记录警告,继续处理 +- **数据完整性**:优先保证数据完整性 + +### 性能影响 +- **内存使用**:缓冲区大小增加,但避免了数据丢失 +- **处理能力**:能够处理更大的客户端数据 +- **稳定性**:提高了系统的健壮性 +- **可维护性**:清晰的概念分离,便于理解和维护 + +### 测试建议 +1. 测试不同大小的消息传输 +2. 验证缓冲区容量检查的正确性 +3. 验证业务限制检查的正确性 +4. 测试分片消息的完整重组 +5. 监控内存使用情况 + +### 总结 +这次修复解决了 WebSocket 中间件中缓冲区大小设计的根本问题: +- **分离了技术限制和业务限制** +- **提供了足够的缓冲区空间处理客户端数据** +- **保持了数据完整性** +- **提高了系统的健壮性和可维护性** \ No newline at end of file