From 4d32386f13ffdc6220e7ff7594ae7ee2c2cfec2c Mon Sep 17 00:00:00 2001
From: root <295172551@qq.com>
Date: Sat, 26 Jul 2025 01:33:18 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ProtocolMessage=E6=A8=A1?=
=?UTF-8?q?=E5=9E=8B=E6=B3=A8=E9=87=8A=EF=BC=8C=E5=AE=8C=E5=96=84=E4=BB=A3?=
=?UTF-8?q?=E7=A0=81=E6=96=87=E6=A1=A3=E5=92=8C=E5=91=BD=E5=90=8D=E8=A7=84?=
=?UTF-8?q?=E8=8C=83?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Network/NetworkProtocolLogObserver.cs | 4 +-
.../Examples/ChannelManagerTestExample.cs | 435 ------------------
.../Examples/CompositionPatternExample.cs | 85 ----
.../Interfaces/IMessageChannelManager.cs | 2 +-
.../Models/ProtocolMessage.cs | 42 +-
.../Services/MessageChannelManager.cs | 4 +-
.../Services/WebSocketTransport.cs | 2 +-
modify.md | 2 +
8 files changed, 42 insertions(+), 534 deletions(-)
delete mode 100644 CoreAgent.WebSocketTransport/Examples/ChannelManagerTestExample.cs
delete mode 100644 CoreAgent.WebSocketTransport/Examples/CompositionPatternExample.cs
diff --git a/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs b/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs
index 993f6a4..760a56c 100644
--- a/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs
+++ b/CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs
@@ -62,9 +62,9 @@ namespace CoreAgent.Infrastructure.Services.Network
Info = log.Info,
Message = log.Message
});
-
+ ProtocolMessage message = new ProtocolMessage(webSocketLogs.ToArray());
// 尝试写入通道并跟踪结果
- var writeSuccess = _ChannelManager.SendChannel.TryWrite(webSocketLogs);
+ var writeSuccess = _ChannelManager.SendChannel.TryWrite(message);
var processingTime = DateTime.UtcNow - startTime;
if (writeSuccess)
diff --git a/CoreAgent.WebSocketTransport/Examples/ChannelManagerTestExample.cs b/CoreAgent.WebSocketTransport/Examples/ChannelManagerTestExample.cs
deleted file mode 100644
index 3cfe289..0000000
--- a/CoreAgent.WebSocketTransport/Examples/ChannelManagerTestExample.cs
+++ /dev/null
@@ -1,435 +0,0 @@
-using CoreAgent.WebSocketTransport.Interfaces;
-using CoreAgent.WebSocketTransport.Services;
-using Microsoft.Extensions.Logging;
-
-namespace CoreAgent.WebSocketTransport.Examples;
-
-///
-/// 消息通道管理器测试示例
-/// 验证 MessageChannelManager 和 ChannelMessageChannel 的修复
-///
-public class ChannelManagerTestExample
-{
- private readonly IMessageChannelManager _channelManager;
- private readonly ILogger _logger;
-
- public ChannelManagerTestExample(IMessageChannelManager channelManager, ILogger logger)
- {
- _channelManager = channelManager;
- _logger = logger;
- }
-
- ///
- /// 测试基本功能
- ///
- public async Task TestBasicFunctionalityAsync()
- {
- _logger.LogInformation("开始测试基本功能");
-
- try
- {
- // 测试发送消息
- await _channelManager.SendChannel.WriteAsync("Test message 1");
- await _channelManager.SendChannel.WriteAsync("Test message 2");
- //await _channelManager.PriorityChannel.WriteAsync("Priority message");
-
- _logger.LogInformation("消息发送成功");
-
- // 测试读取消息
- if (_channelManager.SendChannel.TryRead(out var message1))
- {
- _logger.LogInformation($"读取到发送消息: {message1}");
- }
-
- if (_channelManager.PriorityChannel.TryRead(out var priorityMessage))
- {
- _logger.LogInformation($"读取到优先级消息: {priorityMessage}");
- }
-
- // 测试状态信息
- var statusInfo = _channelManager.GetStatusInfo();
- _logger.LogInformation($"通道状态: 发送={statusInfo.SendChannelCount}, 接收={statusInfo.ReceiveChannelCount}, 优先级={statusInfo.PriorityChannelCount}");
-
- _logger.LogInformation("基本功能测试通过");
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "基本功能测试失败");
- throw;
- }
- }
-
- ///
- /// 测试异步操作
- ///
- public async Task TestAsyncOperationsAsync()
- {
- _logger.LogInformation("开始测试异步操作");
-
- try
- {
- // 启动异步接收任务
- var receiveTask = Task.Run(async () =>
- {
- try
- {
- // 等待消息可读
- var hasData = await _channelManager.ReceiveChannel.WaitToReadAsync(CancellationToken.None);
- if (hasData)
- {
- var message = await _channelManager.ReceiveChannel.ReadAsync(CancellationToken.None);
- _logger.LogInformation($"异步接收到消息: {message}");
- }
- }
- catch (OperationCanceledException)
- {
- _logger.LogInformation("异步接收被取消");
- }
- });
-
- // 发送消息到接收通道
- await _channelManager.ReceiveChannel.WriteAsync("Async test message");
-
- // 等待接收任务完成
- await receiveTask;
-
- _logger.LogInformation("异步操作测试通过");
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "异步操作测试失败");
- throw;
- }
- }
-
- ///
- /// 测试批量操作
- ///
- public async Task TestBatchOperationsAsync()
- {
- _logger.LogInformation("开始测试批量操作");
-
- try
- {
- // 批量发送消息
- for (int i = 0; i < 10; i++)
- {
- await _channelManager.SendChannel.WriteAsync($"Batch message {i}");
- }
-
- // 检查状态
- var statusBefore = _channelManager.GetStatusInfo();
- _logger.LogInformation($"批量发送后状态: 发送队列={statusBefore.SendChannelCount}");
-
- // 清空通道
- _channelManager.ClearAllChannels();
-
- // 检查清空后状态
- var statusAfter = _channelManager.GetStatusInfo();
- _logger.LogInformation($"清空后状态: 发送队列={statusAfter.SendChannelCount}");
-
- if (statusAfter.SendChannelCount == 0)
- {
- _logger.LogInformation("批量操作测试通过");
- }
- else
- {
- throw new Exception("清空操作失败");
- }
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "批量操作测试失败");
- throw;
- }
- }
-
- ///
- /// 测试错误处理
- ///
- public async Task TestErrorHandlingAsync()
- {
- _logger.LogInformation("开始测试错误处理");
-
- try
- {
- // 测试空消息
- try
- {
- await _channelManager.SendChannel.WriteAsync(null!);
- throw new Exception("应该抛出 ArgumentNullException");
- }
- catch (ArgumentNullException)
- {
- _logger.LogInformation("空消息检查通过");
- }
-
- // 测试已释放的通道
- _channelManager.Dispose();
-
- try
- {
- await _channelManager.SendChannel.WriteAsync("test");
- throw new Exception("应该抛出 ObjectDisposedException");
- }
- catch (ObjectDisposedException)
- {
- _logger.LogInformation("已释放检查通过");
- }
-
- _logger.LogInformation("错误处理测试通过");
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "错误处理测试失败");
- throw;
- }
- }
-
- ///
- /// 测试性能
- ///
- public async Task TestPerformanceAsync()
- {
- _logger.LogInformation("开始测试性能");
-
- try
- {
- const int messageCount = 1000;
- const int batchSize = 100;
-
- // 清空通道,确保测试环境干净
- _channelManager.ClearAllChannels();
-
- var stopwatch = System.Diagnostics.Stopwatch.StartNew();
- var memoryBefore = GC.GetTotalMemory(false);
-
- // 批量发送 - 使用更高效的方式
- var sendTasks = new List(batchSize);
- for (int i = 0; i < messageCount; i++)
- {
- sendTasks.Add(_channelManager.SendChannel.WriteAsync($"Performance test message {i}"));
-
- // 每 batchSize 个任务等待一次,避免创建过多任务
- if (sendTasks.Count >= batchSize || i == messageCount - 1)
- {
- // 等待所有 ValueTask 完成
- foreach (var task in sendTasks)
- {
- await task;
- }
- sendTasks.Clear();
- }
- }
-
- stopwatch.Stop();
- var sendTime = stopwatch.ElapsedMilliseconds;
- var memoryAfterSend = GC.GetTotalMemory(false);
-
- _logger.LogInformation($"发送 {messageCount} 条消息耗时: {sendTime}ms, 内存增长: {memoryAfterSend - memoryBefore} bytes");
-
- // 验证发送的消息数量
- var statusAfterSend = _channelManager.GetStatusInfo();
- if (statusAfterSend.SendChannelCount != messageCount)
- {
- throw new Exception($"发送后消息数量不匹配: 期望={messageCount}, 实际={statusAfterSend.SendChannelCount}");
- }
-
- // 批量读取 - 使用更高效的方式
- stopwatch.Restart();
- var readCount = 0;
- var readMessages = new List