diff --git a/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs b/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs index 993f6a4..760a56c 100644 --- a/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs +++ b/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs @@ -62,9 +62,9 @@ namespace CoreAgent.Infrastructure.Services.Network Info = log.Info, Message = log.Message }); - + ProtocolMessage message = new ProtocolMessage(webSocketLogs.ToArray()); // 尝试写入通道并跟踪结果 - var writeSuccess = _ChannelManager.SendChannel.TryWrite(webSocketLogs); + var writeSuccess = _ChannelManager.SendChannel.TryWrite(message); var processingTime = DateTime.UtcNow - startTime; if (writeSuccess) diff --git a/CoreAgent.WebSocketTransport/Examples/ChannelManagerTestExample.cs b/CoreAgent.WebSocketTransport/Examples/ChannelManagerTestExample.cs deleted file mode 100644 index 3cfe289..0000000 --- a/CoreAgent.WebSocketTransport/Examples/ChannelManagerTestExample.cs +++ /dev/null @@ -1,435 +0,0 @@ -using CoreAgent.WebSocketTransport.Interfaces; -using CoreAgent.WebSocketTransport.Services; -using Microsoft.Extensions.Logging; - -namespace CoreAgent.WebSocketTransport.Examples; - -/// -/// 消息通道管理器测试示例 -/// 验证 MessageChannelManager 和 ChannelMessageChannel 的修复 -/// -public class ChannelManagerTestExample -{ - private readonly IMessageChannelManager _channelManager; - private readonly ILogger _logger; - - public ChannelManagerTestExample(IMessageChannelManager channelManager, ILogger logger) - { - _channelManager = channelManager; - _logger = logger; - } - - /// - /// 测试基本功能 - /// - public async Task TestBasicFunctionalityAsync() - { - _logger.LogInformation("开始测试基本功能"); - - try - { - // 测试发送消息 - await _channelManager.SendChannel.WriteAsync("Test message 1"); - await _channelManager.SendChannel.WriteAsync("Test message 2"); - //await _channelManager.PriorityChannel.WriteAsync("Priority message"); - - _logger.LogInformation("消息发送成功"); - - // 测试读取消息 - if (_channelManager.SendChannel.TryRead(out var message1)) - { - _logger.LogInformation($"读取到发送消息: {message1}"); - } - - if (_channelManager.PriorityChannel.TryRead(out var priorityMessage)) - { - _logger.LogInformation($"读取到优先级消息: {priorityMessage}"); - } - - // 测试状态信息 - var statusInfo = _channelManager.GetStatusInfo(); - _logger.LogInformation($"通道状态: 发送={statusInfo.SendChannelCount}, 接收={statusInfo.ReceiveChannelCount}, 优先级={statusInfo.PriorityChannelCount}"); - - _logger.LogInformation("基本功能测试通过"); - } - catch (Exception ex) - { - _logger.LogError(ex, "基本功能测试失败"); - throw; - } - } - - /// - /// 测试异步操作 - /// - public async Task TestAsyncOperationsAsync() - { - _logger.LogInformation("开始测试异步操作"); - - try - { - // 启动异步接收任务 - var receiveTask = Task.Run(async () => - { - try - { - // 等待消息可读 - var hasData = await _channelManager.ReceiveChannel.WaitToReadAsync(CancellationToken.None); - if (hasData) - { - var message = await _channelManager.ReceiveChannel.ReadAsync(CancellationToken.None); - _logger.LogInformation($"异步接收到消息: {message}"); - } - } - catch (OperationCanceledException) - { - _logger.LogInformation("异步接收被取消"); - } - }); - - // 发送消息到接收通道 - await _channelManager.ReceiveChannel.WriteAsync("Async test message"); - - // 等待接收任务完成 - await receiveTask; - - _logger.LogInformation("异步操作测试通过"); - } - catch (Exception ex) - { - _logger.LogError(ex, "异步操作测试失败"); - throw; - } - } - - /// - /// 测试批量操作 - /// - public async Task TestBatchOperationsAsync() - { - _logger.LogInformation("开始测试批量操作"); - - try - { - // 批量发送消息 - for (int i = 0; i < 10; i++) - { - await _channelManager.SendChannel.WriteAsync($"Batch message {i}"); - } - - // 检查状态 - var statusBefore = _channelManager.GetStatusInfo(); - _logger.LogInformation($"批量发送后状态: 发送队列={statusBefore.SendChannelCount}"); - - // 清空通道 - _channelManager.ClearAllChannels(); - - // 检查清空后状态 - var statusAfter = _channelManager.GetStatusInfo(); - _logger.LogInformation($"清空后状态: 发送队列={statusAfter.SendChannelCount}"); - - if (statusAfter.SendChannelCount == 0) - { - _logger.LogInformation("批量操作测试通过"); - } - else - { - throw new Exception("清空操作失败"); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "批量操作测试失败"); - throw; - } - } - - /// - /// 测试错误处理 - /// - public async Task TestErrorHandlingAsync() - { - _logger.LogInformation("开始测试错误处理"); - - try - { - // 测试空消息 - try - { - await _channelManager.SendChannel.WriteAsync(null!); - throw new Exception("应该抛出 ArgumentNullException"); - } - catch (ArgumentNullException) - { - _logger.LogInformation("空消息检查通过"); - } - - // 测试已释放的通道 - _channelManager.Dispose(); - - try - { - await _channelManager.SendChannel.WriteAsync("test"); - throw new Exception("应该抛出 ObjectDisposedException"); - } - catch (ObjectDisposedException) - { - _logger.LogInformation("已释放检查通过"); - } - - _logger.LogInformation("错误处理测试通过"); - } - catch (Exception ex) - { - _logger.LogError(ex, "错误处理测试失败"); - throw; - } - } - - /// - /// 测试性能 - /// - public async Task TestPerformanceAsync() - { - _logger.LogInformation("开始测试性能"); - - try - { - const int messageCount = 1000; - const int batchSize = 100; - - // 清空通道,确保测试环境干净 - _channelManager.ClearAllChannels(); - - var stopwatch = System.Diagnostics.Stopwatch.StartNew(); - var memoryBefore = GC.GetTotalMemory(false); - - // 批量发送 - 使用更高效的方式 - var sendTasks = new List(batchSize); - for (int i = 0; i < messageCount; i++) - { - sendTasks.Add(_channelManager.SendChannel.WriteAsync($"Performance test message {i}")); - - // 每 batchSize 个任务等待一次,避免创建过多任务 - if (sendTasks.Count >= batchSize || i == messageCount - 1) - { - // 等待所有 ValueTask 完成 - foreach (var task in sendTasks) - { - await task; - } - sendTasks.Clear(); - } - } - - stopwatch.Stop(); - var sendTime = stopwatch.ElapsedMilliseconds; - var memoryAfterSend = GC.GetTotalMemory(false); - - _logger.LogInformation($"发送 {messageCount} 条消息耗时: {sendTime}ms, 内存增长: {memoryAfterSend - memoryBefore} bytes"); - - // 验证发送的消息数量 - var statusAfterSend = _channelManager.GetStatusInfo(); - if (statusAfterSend.SendChannelCount != messageCount) - { - throw new Exception($"发送后消息数量不匹配: 期望={messageCount}, 实际={statusAfterSend.SendChannelCount}"); - } - - // 批量读取 - 使用更高效的方式 - stopwatch.Restart(); - var readCount = 0; - var readMessages = new List(batchSize); - - while (readCount < messageCount) - { - // 批量读取 - while (readMessages.Count < batchSize && _channelManager.SendChannel.TryRead(out var message)) - { - readMessages.Add(message); - readCount++; - } - - // 处理批量读取的消息 - if (readMessages.Count > 0) - { - // 这里可以添加消息处理逻辑 - readMessages.Clear(); - } - - // 如果还有消息但当前批次已满,继续下一轮 - if (readCount < messageCount && readMessages.Count == 0) - { - await Task.Delay(1); // 短暂延迟,避免CPU占用过高 - } - } - - stopwatch.Stop(); - var readTime = stopwatch.ElapsedMilliseconds; - var memoryAfterRead = GC.GetTotalMemory(false); - - _logger.LogInformation($"读取 {readCount} 条消息耗时: {readTime}ms, 内存增长: {memoryAfterRead - memoryAfterSend} bytes"); - - // 验证读取后的状态 - var statusAfterRead = _channelManager.GetStatusInfo(); - if (statusAfterRead.SendChannelCount != 0) - { - throw new Exception($"读取后通道应该为空: 实际={statusAfterRead.SendChannelCount}"); - } - - // 性能指标计算 - var totalTime = sendTime + readTime; - var messagesPerSecond = (messageCount * 1000.0) / totalTime; - var totalMemoryGrowth = memoryAfterRead - memoryBefore; - - _logger.LogInformation($"性能测试结果:"); - _logger.LogInformation($" 总耗时: {totalTime}ms"); - _logger.LogInformation($" 发送耗时: {sendTime}ms ({sendTime * 100.0 / totalTime:F1}%)"); - _logger.LogInformation($" 读取耗时: {readTime}ms ({readTime * 100.0 / totalTime:F1}%)"); - _logger.LogInformation($" 消息吞吐量: {messagesPerSecond:F0} 消息/秒"); - _logger.LogInformation($" 总内存增长: {totalMemoryGrowth} bytes"); - _logger.LogInformation($" 平均每条消息内存: {totalMemoryGrowth / (double)messageCount:F1} bytes"); - - if (readCount == messageCount) - { - _logger.LogInformation("性能测试通过"); - } - else - { - throw new Exception($"消息数量不匹配: 期望={messageCount}, 实际={readCount}"); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "性能测试失败"); - throw; - } - } - - /// - /// 测试通道完成 - /// - public async Task TestChannelCompletionAsync() - { - _logger.LogInformation("开始测试通道完成"); - - try - { - // 发送一些消息 - await _channelManager.SendChannel.WriteAsync("Message before completion"); - await _channelManager.ReceiveChannel.WriteAsync("Receive message before completion"); - - // 完成通道 - _channelManager.CompleteAllChannels(); - - // 检查完成状态 - var statusInfo = _channelManager.GetStatusInfo(); - if (statusInfo.SendChannelCompleted && statusInfo.ReceiveChannelCompleted && statusInfo.PriorityChannelCompleted) - { - _logger.LogInformation("通道完成测试通过"); - } - else - { - throw new Exception("通道完成状态不正确"); - } - - // 尝试发送消息到已完成的通道 - try - { - await _channelManager.SendChannel.WriteAsync("Message after completion"); - throw new Exception("应该抛出 InvalidOperationException"); - } - catch (InvalidOperationException) - { - _logger.LogInformation("已完成通道写入检查通过"); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "通道完成测试失败"); - throw; - } - } - - /// - /// 测试 WaitToReadAsync 方法 - /// - public async Task TestWaitToReadAsyncAsync() - { - _logger.LogInformation("开始测试 WaitToReadAsync 方法"); - - try - { - // 测试空通道的等待 - var waitTask = _channelManager.SendChannel.WaitToReadAsync(CancellationToken.None); - - // 等待一小段时间,应该返回 false(没有数据) - await Task.Delay(100); - - if (waitTask.IsCompleted) - { - var hasData = await waitTask; - if (!hasData) - { - _logger.LogInformation("空通道等待测试通过"); - } - else - { - throw new Exception("空通道应该返回 false"); - } - } - - // 测试有数据的等待 - var sendTask = _channelManager.SendChannel.WriteAsync("Test message for WaitToReadAsync"); - var waitTask2 = _channelManager.SendChannel.WaitToReadAsync(CancellationToken.None); - - await sendTask; - var hasData2 = await waitTask2; - - if (hasData2) - { - _logger.LogInformation("有数据通道等待测试通过"); - } - else - { - throw new Exception("有数据的通道应该返回 true"); - } - - // 清理 - _channelManager.SendChannel.TryRead(out _); - - _logger.LogInformation("WaitToReadAsync 测试通过"); - } - catch (Exception ex) - { - _logger.LogError(ex, "WaitToReadAsync 测试失败"); - throw; - } - } - - /// - /// 运行所有测试 - /// - public async Task RunAllTestsAsync() - { - _logger.LogInformation("开始运行所有测试"); - - try - { - await TestBasicFunctionalityAsync(); - await TestAsyncOperationsAsync(); - await TestWaitToReadAsyncAsync(); - await TestBatchOperationsAsync(); - await TestPerformanceAsync(); - await TestChannelCompletionAsync(); - // 注意:TestErrorHandlingAsync 会释放通道管理器,所以放在最后 - await TestErrorHandlingAsync(); - - _logger.LogInformation("所有测试通过!"); - } - catch (Exception ex) - { - _logger.LogError(ex, "测试失败"); - throw; - } - } -} \ No newline at end of file diff --git a/CoreAgent.WebSocketTransport/Examples/CompositionPatternExample.cs b/CoreAgent.WebSocketTransport/Examples/CompositionPatternExample.cs deleted file mode 100644 index 90588ef..0000000 --- a/CoreAgent.WebSocketTransport/Examples/CompositionPatternExample.cs +++ /dev/null @@ -1,85 +0,0 @@ -using CoreAgent.WebSocketTransport.Interfaces; -using CoreAgent.WebSocketTransport.Services; - -namespace CoreAgent.WebSocketTransport.Examples; - -/// -/// 组合模式使用示例 -/// 展示如何使用 IWebSocketTransport 和 IMessageChannelManager 的组合 -/// -public class CompositionPatternExample -{ - private readonly IWebSocketTransport _webSocketTransport; - private readonly IMessageChannelManager _channelManager; - - public CompositionPatternExample(IWebSocketTransport webSocketTransport, IMessageChannelManager channelManager) - { - _webSocketTransport = webSocketTransport; - _channelManager = channelManager; - } - - /// - /// 示例:基本使用流程 - /// - public async Task BasicUsageAsync() - { - // 1. 连接服务器 - await _webSocketTransport.ConnectAsync(); - Console.WriteLine($"连接状态: {_webSocketTransport.IsConnected}"); - - // 2. 发送消息 - 通过通道管理器 - await _channelManager.SendChannel.WriteAsync("Hello World"); - await _channelManager.SendChannel.WriteAsync(new { type = "chat", message = "Hello" }); - - // 3. 接收消息 - 通过通道管理器 - _ = Task.Run(async () => - { - while (!_channelManager.ReceiveChannel.IsCompleted) - { - if (_channelManager.ReceiveChannel.TryRead(out var message)) - { - Console.WriteLine($"收到消息: {message}"); - } - await Task.Delay(100); - } - }); - - // 4. 关闭连接 - await _webSocketTransport.CloseAsync(); - } - - /// - /// 示例:优先级消息处理 - /// - public async Task PriorityMessageExampleAsync() - { - await _webSocketTransport.ConnectAsync(); - - // 发送普通消息 - await _channelManager.SendChannel.WriteAsync("Normal message"); - - // 发送优先级消息 - //await _channelManager.PriorityChannel.WriteAsync("Priority message"); - - // 等待消息发送完成 - await Task.Delay(1000); - - await _webSocketTransport.CloseAsync(); - } - - /// - /// 示例:连接状态监控 - /// - public async Task ConnectionMonitoringExampleAsync() - { - await _webSocketTransport.ConnectAsync(); - - // 监控连接状态和心跳 - while (_webSocketTransport.IsConnected) - { - Console.WriteLine($"连接状态: {_webSocketTransport.IsConnected}"); - Console.WriteLine($"最后心跳: {_webSocketTransport.LastHeartbeat}"); - await Task.Delay(5000); - } - } -} \ No newline at end of file diff --git a/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs b/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs index 07a66d2..efddc8f 100644 --- a/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs +++ b/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs @@ -11,7 +11,7 @@ public interface IMessageChannelManager : IDisposable /// /// 获取发送消息通道 /// - IMessageChannel SendChannel { get; } + IMessageChannel SendChannel { get; } /// /// 获取接收消息通道 diff --git a/CoreAgent.WebSocketTransport/Models/ProtocolMessage.cs b/CoreAgent.WebSocketTransport/Models/ProtocolMessage.cs index 7655872..d8372e5 100644 --- a/CoreAgent.WebSocketTransport/Models/ProtocolMessage.cs +++ b/CoreAgent.WebSocketTransport/Models/ProtocolMessage.cs @@ -6,27 +6,53 @@ using System.Threading.Tasks; namespace CoreAgent.WebSocketTransport.Models { - partial class ProtocolMessage + /// + /// 协议消息模型 + /// 用于封装和传输协议相关的消息数据 + /// + public class ProtocolMessage { /// - /// 消息类型 + /// 消息类型标识 + /// 默认为 "Protocol",表示协议类型消息 /// public string Type { get; set; } = "Protocol"; /// - /// 消息载荷 + /// 消息载荷数据 + /// 包含具体的协议日志数据 + /// + public ProtocolPayload Payload { get; set; } + + /// + /// 初始化协议消息的新实例 /// - public HeartbeatPayload Payload { get; set; } = new HeartbeatPayload(); + /// 协议日志消息数组 + public ProtocolMessage(MessageTransferProtocolLog[] messages) + { + Payload = new ProtocolPayload(messages); + } } /// - /// 心跳消息载荷 + /// 协议消息载荷 + /// 包含具体的协议日志数据内容 /// public class ProtocolPayload { /// - /// 心跳消息内容 + /// 协议日志消息数组 + /// 存储要传输的协议日志数据 + /// + public MessageTransferProtocolLog[] Message { get; set; } = Array.Empty(); + + /// + /// 初始化协议载荷的新实例 /// - public string Message { get; set; } = "ping"; + /// 协议日志消息数组 + public ProtocolPayload(MessageTransferProtocolLog[] messages) + { + Message = messages; + } } -} +} \ No newline at end of file diff --git a/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs b/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs index 7e2fe3c..b9f57f3 100644 --- a/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs +++ b/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs @@ -14,7 +14,7 @@ public class MessageChannelManager : IMessageChannelManager private volatile bool _disposed; private readonly object _disposeLock = new object(); - public IMessageChannel SendChannel { get; } + public IMessageChannel SendChannel { get; } public IMessageChannel ReceiveChannel { get; } public IMessageChannel PriorityChannel { get; } @@ -35,7 +35,7 @@ public class MessageChannelManager : IMessageChannelManager try { - SendChannel = new ChannelMessageChannel(sendChannelCapacity); + SendChannel = new ChannelMessageChannel(sendChannelCapacity); ReceiveChannel = new ChannelMessageChannel(receiveChannelCapacity); PriorityChannel = new ChannelMessageChannel(priorityChannelCapacity); diff --git a/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs b/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs index ae49c2e..adbbd4b 100644 --- a/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs +++ b/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs @@ -329,7 +329,7 @@ public class WebSocketTransport : IWebSocketTransport if (priorityMessage is null) continue; _logger.LogTrace("处理优先级消息 #{PriorityCount}: {MessageType}", priorityMessageCount, priorityMessage?.GetType().Name ?? "null"); - await SendSingleMessageAsync(JsonSerializer.Serialize(priorityMessage), cancellationToken); + await SendSingleMessageAsync(priorityMessage, cancellationToken); } // 处理普通消息 diff --git a/modify.md b/modify.md index 8b0542f..ab3fad4 100644 --- a/modify.md +++ b/modify.md @@ -89,11 +89,13 @@ - **错误处理**:完善的异常处理和日志记录 - **Bug修复**:修复空引用检查和多次枚举的性能问题 - **边界处理**:添加空集合检查,避免处理空集合 + - **代码规范**:优化ProtocolMessage模型注释,提高代码可读性 **影响范围**: - WebSocket传输层协议日志处理 - 协议日志观察者模式实现 - 跨项目类型转换逻辑 +- 协议消息模型注释优化 ### CellularNetworkService.StartNetworkAsync 方法添加协议客户端配置创建