Browse Source

fix(websocket): decouple buffer size from business limit, add MessageBufferMultiplier for large fragmented messages

feature/x1-web-request
root 1 week ago
parent
commit
6e6757107c
  1. 3
      src/X1.WebAPI/Program.cs
  2. 5
      src/X1.WebSocket/Buffer/WebSocketMessageBuffer.cs
  3. 2
      src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs
  4. 100
      src/X1.WebSocket/Middleware/WebSocketMiddleware.cs
  5. 20
      src/X1.WebSocket/Models/WebSocketOptions.cs
  6. 156
      src/modify.md

3
src/X1.WebAPI/Program.cs

@ -49,7 +49,8 @@ builder.Services.AddWebSocketServices(options =>
{
// 配置 WebSocket 选项
options.MaxConcurrentConnections = 2000; // 最大并发连接数
options.MaxMessageSize = 1024 * 1024; // 最大消息大小(字节)
options.MaxMessageSize = 1024 * 1024; // 最大消息大小(业务限制,1MB)
options.MessageBufferMultiplier = 5; // 缓冲区大小倍数(5MB 缓冲区)
options.ConnectionTimeout = TimeSpan.FromMinutes(2); // 连接超时时间
options.HeartbeatInterval = TimeSpan.FromSeconds(30); // 心跳检测间隔
});

5
src/X1.WebSocket/Buffer/WebSocketMessageBuffer.cs

@ -27,6 +27,11 @@ public sealed class WebSocketMessageBuffer : IDisposable
/// </summary>
public bool IsFull => _position >= _maxSize;
/// <summary>
/// 缓冲区的最大大小
/// </summary>
public int MaxSize => _maxSize;
/// <summary>
/// 构造函数
/// </summary>

2
src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs

@ -41,7 +41,7 @@ namespace CellularManagement.WebSocket.Handlers
MessageType = System.Net.WebSockets.WebSocketMessageType.Text,
NeedQueue = false
};
await Task.CompletedTask.ConfigureAwait(false);
_logger.LogDebug("协议消息处理完成,连接ID:{ConnectionId}", message.ConnectionId);
return response;
}

100
src/X1.WebSocket/Middleware/WebSocketMiddleware.cs

@ -308,11 +308,19 @@ public class WebSocketMiddleware
{
_logger.LogInformation("开始处理 WebSocket 消息,连接ID:{ConnectionId}", connectionId);
// 使用动态缓冲区大小,为客户端数据提供足够空间
// 缓冲区大小 = 业务限制 * 倍数,确保能处理客户端发送的大数据
var bufferSize = _options.GetActualBufferSize();
using var messageBuffer = new WebSocketMessageBuffer(bufferSize);
_logger.LogDebug("创建消息缓冲区,连接ID:{ConnectionId},缓冲区大小:{BufferSize}字节,业务限制:{MaxMessageSize}字节",
connectionId, bufferSize, _options.MaxMessageSize);
// 接收第一条消息
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), cancellationToken);
_logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType}",
connectionId, receiveResult.MessageType);
_logger.LogDebug("收到第一条消息,连接ID:{ConnectionId},消息类型:{MessageType},结束标志:{EndOfMessage}",
connectionId, receiveResult.MessageType, receiveResult.EndOfMessage);
// 循环处理消息,直到收到关闭消息或发生错误
while (!receiveResult.CloseStatus.HasValue)
@ -338,11 +346,20 @@ public class WebSocketMiddleware
// 处理有效消息类型(文本或二进制)
if (IsValidMessageType(receiveResult.MessageType))
{
_logger.LogDebug("处理消息,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节",
connectionId, receiveResult.MessageType, receiveResult.Count);
await ProcessMessage(webSocket, connectionId, buffer, receiveResult,
messageChannel, messageStartTime, cancellationToken);
messageStartTime = DateTime.UtcNow;
_logger.LogDebug("处理消息片段,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节,结束标志:{EndOfMessage}",
connectionId, receiveResult.MessageType, receiveResult.Count, receiveResult.EndOfMessage);
// 累积消息片段
await AccumulateMessageFragment(webSocket, connectionId, buffer, receiveResult,
messageBuffer, messageChannel, messageStartTime, cancellationToken);
// 如果消息完整,重置缓冲区并更新时间
if (receiveResult.EndOfMessage)
{
messageBuffer.Reset();
messageStartTime = DateTime.UtcNow;
_logger.LogDebug("消息完整接收,连接ID:{ConnectionId},重置缓冲区", connectionId);
}
}
else
{
@ -353,35 +370,22 @@ public class WebSocketMiddleware
// 接收下一条消息
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), cancellationToken);
_logger.LogDebug("接收下一条消息,连接ID:{ConnectionId},消息类型:{MessageType}",
connectionId, receiveResult.MessageType);
_logger.LogDebug("接收下一条消息,连接ID:{ConnectionId},消息类型:{MessageType},结束标志:{EndOfMessage}",
connectionId, receiveResult.MessageType, receiveResult.EndOfMessage);
}
_logger.LogInformation("WebSocket 消息处理完成,连接ID:{ConnectionId}", connectionId);
}
/// <summary>
/// 处理单个消息
/// 累积消息片段
/// </summary>
/// <param name="webSocket">WebSocket 连接实例</param>
/// <param name="connectionId">连接ID</param>
/// <param name="buffer">接收缓冲区</param>
/// <param name="receiveResult">接收结果</param>
/// <param name="messageChannel">消息通道</param>
/// <param name="messageStartTime">消息开始时间</param>
/// <param name="cancellationToken">取消令牌</param>
/// <remarks>
/// 该方法负责:
/// 1. 将接收到的数据写入消息缓冲区
/// 2. 检查消息是否完整
/// 3. 创建消息对象并写入通道
/// 4. 记录性能指标
/// </remarks>
private async Task ProcessMessage(
private async Task AccumulateMessageFragment(
System.Net.WebSockets.WebSocket webSocket,
string connectionId,
byte[] buffer,
WebSocketReceiveResult receiveResult,
WebSocketMessageBuffer messageBuffer,
Channel<WebSocketMessage> messageChannel,
DateTime messageStartTime,
CancellationToken cancellationToken)
@ -392,9 +396,6 @@ public class WebSocketMiddleware
{
var success = await _errorHandler.HandleWithRetryAsync(async () =>
{
// 为每个消息创建独立的消息缓冲区,避免并发问题
using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize);
// 验证输入参数
if (buffer == null || buffer.Length == 0)
{
@ -406,22 +407,47 @@ public class WebSocketMiddleware
throw new WebSocketException($"无效的消息大小:{receiveResult.Count},缓冲区大小:{buffer.Length}");
}
if (receiveResult.Count > _options.MaxMessageSize)
// 检查累积后的大小是否会超过缓冲区容量
// 这是技术层面的检查,防止缓冲区溢出
var totalSize = messageBuffer.Size + receiveResult.Count;
if (totalSize > messageBuffer.MaxSize)
{
throw new WebSocketException($"消息大小超过限制:{receiveResult.Count} > {_options.MaxMessageSize}");
throw new WebSocketException($"消息大小将超过缓冲区容量:{totalSize} > {messageBuffer.MaxSize}字节");
}
// 累积消息片段
if (!messageBuffer.TryWrite(buffer, 0, receiveResult.Count))
{
throw new WebSocketException("消息缓冲区溢出");
throw new WebSocketException("消息缓冲区写入失败");
}
_logger.LogDebug("消息片段已累积,连接ID:{ConnectionId},当前缓冲区大小:{BufferSize}字节,片段大小:{FragmentSize}字节",
connectionId, messageBuffer.Size, receiveResult.Count);
// 如果消息完整,创建完整的消息对象
if (receiveResult.EndOfMessage)
{
var messageData = messageBuffer.GetMessage();
// 最终验证消息大小是否符合业务限制
// 这是业务层面的检查,超过限制会记录警告但继续处理
if (messageData.Length > _options.MaxMessageSize)
{
_logger.LogWarning("消息大小超过业务限制,连接ID:{ConnectionId},大小:{Size}字节,限制:{MaxSize}字节,但已完整接收",
connectionId, messageData.Length, _options.MaxMessageSize);
// 可以选择:
// 1. 拒绝处理,返回错误
// 2. 记录警告但继续处理
// 3. 根据业务需求决定
// 这里选择记录警告但继续处理,确保数据完整性
}
var message = new WebSocketMessage
{
ConnectionId = connectionId,
Data = messageBuffer.GetMessage(),
Data = messageData,
MessageType = receiveResult.MessageType,
IsComplete = true
};
@ -437,7 +463,8 @@ public class WebSocketMiddleware
// 记录 Channel 消息处理统计
_channelManager.RecordMessageProcessed(messageChannel, message.Data.Length);
// messageBuffer 会在 using 语句中自动释放
_logger.LogDebug("完整消息已写入通道,连接ID:{ConnectionId},消息大小:{MessageSize}字节,处理时间:{ProcessingTime}ms",
connectionId, message.Data.Length, processingTime);
}
});
@ -448,20 +475,19 @@ public class WebSocketMiddleware
}
catch (WebSocketException ex)
{
_logger.LogError(ex, "WebSocket 消息处理异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}",
_logger.LogError(ex, "WebSocket 消息片段处理异常,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节,错误:{ErrorMessage}",
connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message);
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken);
}
catch (OperationCanceledException ex)
{
_logger.LogWarning(ex, "消息处理被取消,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节",
_logger.LogWarning(ex, "消息片段处理被取消,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节",
connectionId, receiveResult.MessageType, receiveResult.Count);
// 取消操作转换为 WebSocket 异常
throw new WebSocketException("消息处理操作被取消");
}
catch (Exception ex)
{
_logger.LogError(ex, "消息处理发生未知异常,连接ID:{ConnectionId},消息类型:{MessageType},消息大小:{MessageSize}字节,错误:{ErrorMessage}",
_logger.LogError(ex, "消息片段处理发生未知异常,连接ID:{ConnectionId},消息类型:{MessageType},片段大小:{FragmentSize}字节,错误:{ErrorMessage}",
connectionId, receiveResult.MessageType, receiveResult.Count, ex.Message);
await HandleMessageProcessingFailure(webSocket, connectionId, cancellationToken);
}

20
src/X1.WebSocket/Models/WebSocketOptions.cs

@ -22,9 +22,18 @@ namespace CellularManagement.WebSocket.Models
public int MessageQueueSize { get; set; } = 10000;
/// <summary>
/// 消息队列大小
/// 最大消息大小(业务限制)
/// 超过此大小的消息会被记录警告,但不会导致连接关闭
/// 这是业务层面的限制,用于防止过大的消息影响系统性能
/// </summary>
public int MaxMessageSize { get; set; } = 1024*1024; // 1MB
/// <summary>
/// 消息缓冲区大小倍数
/// 用于计算实际缓冲区大小:MaxMessageSize * MessageBufferMultiplier
/// 确保缓冲区有足够空间处理客户端发送的大数据
/// </summary>
public int MaxMessageSize { get; set; } = 1024*1024;
public int MessageBufferMultiplier { get; set; } = 5; // 5倍,即5MB缓冲区
/// <summary>
/// 最大并发处理数
@ -57,5 +66,12 @@ namespace CellularManagement.WebSocket.Models
/// 消息重试间隔
/// </summary>
public TimeSpan MessageRetryInterval { get; set; } = TimeSpan.FromSeconds(1);
/// <summary>
/// 计算实际缓冲区大小
/// 用于累积分片消息的缓冲区大小,应该大于业务限制
/// </summary>
/// <returns>实际缓冲区大小(字节)</returns>
public int GetActualBufferSize() => MaxMessageSize * MessageBufferMultiplier;
}
}

156
src/modify.md

@ -438,4 +438,158 @@ catch (Exception ex)
- 简化了代码逻辑,提高了可读性
- 减少了不必要的重试开销
- 保持了错误处理的完整性
- 提高了代码执行效率
- 提高了代码执行效率
## 2024-12-19 WebSocket 缓冲区大小修复
### 修改文件
- `X1.WebSocket/Models/WebSocketOptions.cs`
- `X1.WebSocket/Buffer/WebSocketMessageBuffer.cs`
- `X1.WebSocket/Middleware/WebSocketMiddleware.cs`
- `X1.WebAPI/Program.cs`
### 问题描述
原始实现中存在严重的设计问题:
```csharp
// 问题代码
using var messageBuffer = new WebSocketMessageBuffer(_options.MaxMessageSize);
```
**问题所在:**
1. **`_options.MaxMessageSize` 是服务器端的业务限制**(默认 1MB)
2. **客户端发送的数据大小无法预知**,可能超过服务器限制
3. **缓冲区大小被硬编码为业务限制**,无法处理更大的消息
4. **混淆了"缓冲区容量"和"业务限制"的概念**
### 根本原因分析
#### 1. 概念混淆
- **业务限制**:`MaxMessageSize` 是业务层面的限制,用于防止过大的消息影响系统性能
- **缓冲区容量**:应该为客户端数据提供足够的空间,通常大于业务限制
#### 2. 实际场景问题
```csharp
// 配置中的限制
options.MaxMessageSize = 1024 * 1024; // 1MB
// 但客户端可能发送:
// - 2MB 的文件
// - 5MB 的图片
// - 10MB 的视频片段
// - 等等...
```
### 修复方案
#### 1. 改进 WebSocketOptions 配置
```csharp
public class WebSocketOptions
{
/// <summary>
/// 最大消息大小(业务限制)
/// 超过此大小的消息会被记录警告,但不会导致连接关闭
/// </summary>
public int MaxMessageSize { get; set; } = 1024 * 1024; // 1MB
/// <summary>
/// 消息缓冲区大小倍数
/// 用于计算实际缓冲区大小:MaxMessageSize * MessageBufferMultiplier
/// </summary>
public int MessageBufferMultiplier { get; set; } = 5; // 5倍
/// <summary>
/// 计算实际缓冲区大小
/// </summary>
public int GetActualBufferSize() => MaxMessageSize * MessageBufferMultiplier;
}
```
#### 2. 增强 WebSocketMessageBuffer
```csharp
public sealed class WebSocketMessageBuffer : IDisposable
{
// 添加公共属性访问最大缓冲区大小
public int MaxSize => _maxSize;
}
```
#### 3. 修改 ProcessWebSocketMessages 方法
```csharp
private async Task ProcessWebSocketMessages(...)
{
// 使用动态缓冲区大小,为客户端数据提供足够空间
var bufferSize = _options.GetActualBufferSize(); // 5MB
using var messageBuffer = new WebSocketMessageBuffer(bufferSize);
_logger.LogDebug("创建消息缓冲区,缓冲区大小:{BufferSize}字节,业务限制:{MaxMessageSize}字节",
bufferSize, _options.MaxMessageSize);
}
```
#### 4. 改进 AccumulateMessageFragment 验证逻辑
```csharp
// 检查累积后的大小是否会超过缓冲区容量
// 这是技术层面的检查,防止缓冲区溢出
var totalSize = messageBuffer.Size + receiveResult.Count;
if (totalSize > messageBuffer.MaxSize)
{
throw new WebSocketException($"消息大小将超过缓冲区容量:{totalSize} > {messageBuffer.MaxSize}字节");
}
// 最终验证消息大小是否符合业务限制
// 这是业务层面的检查,超过限制会记录警告但继续处理
if (messageData.Length > _options.MaxMessageSize)
{
_logger.LogWarning("消息大小超过业务限制,但已完整接收");
// 记录警告但继续处理,确保数据完整性
}
```
#### 5. 更新配置示例
```csharp
builder.Services.AddWebSocketServices(options =>
{
options.MaxConcurrentConnections = 2000;
options.MaxMessageSize = 1024 * 1024; // 业务限制:1MB
options.MessageBufferMultiplier = 5; // 缓冲区:5MB
options.ConnectionTimeout = TimeSpan.FromMinutes(2);
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
});
```
### 技术细节
#### 1. 分层验证策略
- **缓冲区容量检查**:防止技术层面的溢出
- **业务限制检查**:防止业务层面的过大消息
- **分离关注点**:技术问题 vs 业务问题
#### 2. 动态缓冲区大小
- **计算公式**:`MaxMessageSize * MessageBufferMultiplier`
- **默认配置**:1MB * 5 = 5MB 缓冲区
- **可配置性**:根据实际需求调整倍数
#### 3. 错误处理策略
- **缓冲区溢出**:抛出异常,关闭连接
- **业务限制超限**:记录警告,继续处理
- **数据完整性**:优先保证数据完整性
### 性能影响
- **内存使用**:缓冲区大小增加,但避免了数据丢失
- **处理能力**:能够处理更大的客户端数据
- **稳定性**:提高了系统的健壮性
- **可维护性**:清晰的概念分离,便于理解和维护
### 测试建议
1. 测试不同大小的消息传输
2. 验证缓冲区容量检查的正确性
3. 验证业务限制检查的正确性
4. 测试分片消息的完整重组
5. 监控内存使用情况
### 总结
这次修复解决了 WebSocket 中间件中缓冲区大小设计的根本问题:
- **分离了技术限制和业务限制**
- **提供了足够的缓冲区空间处理客户端数据**
- **保持了数据完整性**
- **提高了系统的健壮性和可维护性**
Loading…
Cancel
Save