diff --git a/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs b/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs index 02a110c..78baf87 100644 --- a/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs +++ b/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs @@ -50,4 +50,18 @@ public class WebSocketConfig /// [Range(1, 1440, ErrorMessage = "缓存 TTL 必须在 1-1440 分钟之间")] public int CacheTtlMinutes { get; set; } = 5; + + /// + /// 最大分包大小(字节),超过此大小的消息将被分包发送 + /// 建议值:32KB-128KB,默认64KB + /// + [Range(1024, 1024 * 1024, ErrorMessage = "分包大小必须在 1KB-1MB 之间")] + public int? MaxChunkSize { get; set; } = 64 * 1024; // 64KB + + /// + /// 分包发送延迟(毫秒),用于控制分包发送的速率 + /// 建议值:1-10ms,默认1ms + /// + [Range(0, 100, ErrorMessage = "分包延迟必须在 0-100 毫秒之间")] + public int? ChunkDelayMs { get; set; } = 1; } \ No newline at end of file diff --git a/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs b/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs index adbbd4b..eb498c2 100644 --- a/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs +++ b/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs @@ -398,14 +398,14 @@ public class WebSocketTransport : IWebSocketTransport { // 处理二进制消息 _logger.LogTrace("发送二进制消息,大小: {Size} bytes", binaryData.Length); - await _connection.SendAsync(new ArraySegment(binaryData), WebSocketMessageType.Binary, true, cancellationToken); + await SendDataWithChunkingAsync(binaryData, WebSocketMessageType.Binary, cancellationToken); } else { // 处理文本消息 var data = _serializer.Serialize(processedMessage); _logger.LogTrace("发送文本消息,大小: {Size} bytes", data.Length); - await _connection.SendAsync(new ArraySegment(data), WebSocketMessageType.Text, true, cancellationToken); + await SendDataWithChunkingAsync(data, WebSocketMessageType.Text, cancellationToken); } stopwatch.Stop(); @@ -419,6 +419,62 @@ public class WebSocketTransport : IWebSocketTransport } } + /// + /// 分包发送数据 + /// + private async Task SendDataWithChunkingAsync(byte[] data, WebSocketMessageType messageType, CancellationToken cancellationToken) + { + // 从配置中获取分包大小,如果没有配置则使用默认值 + var maxChunkSize = _config.MaxChunkSize ?? 64 * 1024; // 默认64KB + + if (data.Length <= maxChunkSize) + { + // 数据小于等于分包大小,直接发送 + await _connection.SendAsync(new ArraySegment(data), messageType, true, cancellationToken); + _logger.LogTrace("数据大小 {Size} bytes 小于等于 {MaxChunkSize} bytes,直接发送", data.Length, maxChunkSize); + } + else + { + // 数据大于分包大小,需要分包发送 + var totalChunks = (int)Math.Ceiling((double)data.Length / maxChunkSize); + _logger.LogInformation("数据大小 {Size} bytes 大于 {MaxChunkSize} bytes,将分 {TotalChunks} 个包发送", + data.Length, maxChunkSize, totalChunks); + + var sentChunks = 0; + try + { + for (int i = 0; i < totalChunks; i++) + { + var offset = i * maxChunkSize; + var chunkSize = Math.Min(maxChunkSize, data.Length - offset); + var chunk = new ArraySegment(data, offset, chunkSize); + + // 最后一个包设置 endOfMessage 为 true + var isLastChunk = i == totalChunks - 1; + + _logger.LogTrace("发送第 {ChunkIndex}/{TotalChunks} 个包,大小: {ChunkSize} bytes,是否最后一个包: {IsLastChunk}", + i + 1, totalChunks, chunkSize, isLastChunk); + + await _connection.SendAsync(chunk, messageType, isLastChunk, cancellationToken); + sentChunks++; + + // 如果不是最后一个包,添加小延迟避免发送过快 + if (!isLastChunk) + { + await Task.Delay(_config.ChunkDelayMs ?? 1, cancellationToken); + } + } + + _logger.LogInformation("分包发送完成,共发送 {TotalChunks} 个包", totalChunks); + } + catch (Exception ex) + { + _logger.LogError(ex, "分包发送失败,已发送 {SentChunks}/{TotalChunks} 个包", sentChunks, totalChunks); + throw; + } + } + } + /// /// 接收循环 - 自动接收数据并推送到接收通道 ///