Browse Source

feat: 添加WebSocket消息分包发送功能

- 在SendSingleMessageAsync中添加分包发送逻辑
- 支持大于64KB的消息自动分包发送
- 添加MaxChunkSize和ChunkDelayMs配置选项
- 优化分包发送的错误处理和日志记录
- 默认分包大小为64KB,可配置调整
feature/protocol-log-Perfect
root 1 week ago
parent
commit
31ec7a91bc
  1. 14
      CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs
  2. 60
      CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs

14
CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs

@ -50,4 +50,18 @@ public class WebSocketConfig
/// </summary>
[Range(1, 1440, ErrorMessage = "缓存 TTL 必须在 1-1440 分钟之间")]
public int CacheTtlMinutes { get; set; } = 5;
/// <summary>
/// 最大分包大小(字节),超过此大小的消息将被分包发送
/// 建议值:32KB-128KB,默认64KB
/// </summary>
[Range(1024, 1024 * 1024, ErrorMessage = "分包大小必须在 1KB-1MB 之间")]
public int? MaxChunkSize { get; set; } = 64 * 1024; // 64KB
/// <summary>
/// 分包发送延迟(毫秒),用于控制分包发送的速率
/// 建议值:1-10ms,默认1ms
/// </summary>
[Range(0, 100, ErrorMessage = "分包延迟必须在 0-100 毫秒之间")]
public int? ChunkDelayMs { get; set; } = 1;
}

60
CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs

@ -398,14 +398,14 @@ public class WebSocketTransport : IWebSocketTransport
{
// 处理二进制消息
_logger.LogTrace("发送二进制消息,大小: {Size} bytes", binaryData.Length);
await _connection.SendAsync(new ArraySegment<byte>(binaryData), WebSocketMessageType.Binary, true, cancellationToken);
await SendDataWithChunkingAsync(binaryData, WebSocketMessageType.Binary, cancellationToken);
}
else
{
// 处理文本消息
var data = _serializer.Serialize(processedMessage);
_logger.LogTrace("发送文本消息,大小: {Size} bytes", data.Length);
await _connection.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, cancellationToken);
await SendDataWithChunkingAsync(data, WebSocketMessageType.Text, cancellationToken);
}
stopwatch.Stop();
@ -419,6 +419,62 @@ public class WebSocketTransport : IWebSocketTransport
}
}
/// <summary>
/// 分包发送数据
/// </summary>
private async Task SendDataWithChunkingAsync(byte[] data, WebSocketMessageType messageType, CancellationToken cancellationToken)
{
// 从配置中获取分包大小,如果没有配置则使用默认值
var maxChunkSize = _config.MaxChunkSize ?? 64 * 1024; // 默认64KB
if (data.Length <= maxChunkSize)
{
// 数据小于等于分包大小,直接发送
await _connection.SendAsync(new ArraySegment<byte>(data), messageType, true, cancellationToken);
_logger.LogTrace("数据大小 {Size} bytes 小于等于 {MaxChunkSize} bytes,直接发送", data.Length, maxChunkSize);
}
else
{
// 数据大于分包大小,需要分包发送
var totalChunks = (int)Math.Ceiling((double)data.Length / maxChunkSize);
_logger.LogInformation("数据大小 {Size} bytes 大于 {MaxChunkSize} bytes,将分 {TotalChunks} 个包发送",
data.Length, maxChunkSize, totalChunks);
var sentChunks = 0;
try
{
for (int i = 0; i < totalChunks; i++)
{
var offset = i * maxChunkSize;
var chunkSize = Math.Min(maxChunkSize, data.Length - offset);
var chunk = new ArraySegment<byte>(data, offset, chunkSize);
// 最后一个包设置 endOfMessage 为 true
var isLastChunk = i == totalChunks - 1;
_logger.LogTrace("发送第 {ChunkIndex}/{TotalChunks} 个包,大小: {ChunkSize} bytes,是否最后一个包: {IsLastChunk}",
i + 1, totalChunks, chunkSize, isLastChunk);
await _connection.SendAsync(chunk, messageType, isLastChunk, cancellationToken);
sentChunks++;
// 如果不是最后一个包,添加小延迟避免发送过快
if (!isLastChunk)
{
await Task.Delay(_config.ChunkDelayMs ?? 1, cancellationToken);
}
}
_logger.LogInformation("分包发送完成,共发送 {TotalChunks} 个包", totalChunks);
}
catch (Exception ex)
{
_logger.LogError(ex, "分包发送失败,已发送 {SentChunks}/{TotalChunks} 个包", sentChunks, totalChunks);
throw;
}
}
}
/// <summary>
/// 接收循环 - 自动接收数据并推送到接收通道
/// </summary>

Loading…
Cancel
Save