You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
435 lines
14 KiB
435 lines
14 KiB
using CoreAgent.WebSocketTransport.Interfaces;
|
|
using CoreAgent.WebSocketTransport.Services;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace CoreAgent.WebSocketTransport.Examples;
|
|
|
|
/// <summary>
|
|
/// 消息通道管理器测试示例
|
|
/// 验证 MessageChannelManager 和 ChannelMessageChannel 的修复
|
|
/// </summary>
|
|
public class ChannelManagerTestExample
|
|
{
|
|
private readonly IMessageChannelManager _channelManager;
|
|
private readonly ILogger<ChannelManagerTestExample> _logger;
|
|
|
|
public ChannelManagerTestExample(IMessageChannelManager channelManager, ILogger<ChannelManagerTestExample> logger)
|
|
{
|
|
_channelManager = channelManager;
|
|
_logger = logger;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 测试基本功能
|
|
/// </summary>
|
|
public async Task TestBasicFunctionalityAsync()
|
|
{
|
|
_logger.LogInformation("开始测试基本功能");
|
|
|
|
try
|
|
{
|
|
// 测试发送消息
|
|
await _channelManager.SendChannel.WriteAsync("Test message 1");
|
|
await _channelManager.SendChannel.WriteAsync("Test message 2");
|
|
//await _channelManager.PriorityChannel.WriteAsync("Priority message");
|
|
|
|
_logger.LogInformation("消息发送成功");
|
|
|
|
// 测试读取消息
|
|
if (_channelManager.SendChannel.TryRead(out var message1))
|
|
{
|
|
_logger.LogInformation($"读取到发送消息: {message1}");
|
|
}
|
|
|
|
if (_channelManager.PriorityChannel.TryRead(out var priorityMessage))
|
|
{
|
|
_logger.LogInformation($"读取到优先级消息: {priorityMessage}");
|
|
}
|
|
|
|
// 测试状态信息
|
|
var statusInfo = _channelManager.GetStatusInfo();
|
|
_logger.LogInformation($"通道状态: 发送={statusInfo.SendChannelCount}, 接收={statusInfo.ReceiveChannelCount}, 优先级={statusInfo.PriorityChannelCount}");
|
|
|
|
_logger.LogInformation("基本功能测试通过");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "基本功能测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 测试异步操作
|
|
/// </summary>
|
|
public async Task TestAsyncOperationsAsync()
|
|
{
|
|
_logger.LogInformation("开始测试异步操作");
|
|
|
|
try
|
|
{
|
|
// 启动异步接收任务
|
|
var receiveTask = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
// 等待消息可读
|
|
var hasData = await _channelManager.ReceiveChannel.WaitToReadAsync(CancellationToken.None);
|
|
if (hasData)
|
|
{
|
|
var message = await _channelManager.ReceiveChannel.ReadAsync(CancellationToken.None);
|
|
_logger.LogInformation($"异步接收到消息: {message}");
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.LogInformation("异步接收被取消");
|
|
}
|
|
});
|
|
|
|
// 发送消息到接收通道
|
|
await _channelManager.ReceiveChannel.WriteAsync("Async test message");
|
|
|
|
// 等待接收任务完成
|
|
await receiveTask;
|
|
|
|
_logger.LogInformation("异步操作测试通过");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "异步操作测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 测试批量操作
|
|
/// </summary>
|
|
public async Task TestBatchOperationsAsync()
|
|
{
|
|
_logger.LogInformation("开始测试批量操作");
|
|
|
|
try
|
|
{
|
|
// 批量发送消息
|
|
for (int i = 0; i < 10; i++)
|
|
{
|
|
await _channelManager.SendChannel.WriteAsync($"Batch message {i}");
|
|
}
|
|
|
|
// 检查状态
|
|
var statusBefore = _channelManager.GetStatusInfo();
|
|
_logger.LogInformation($"批量发送后状态: 发送队列={statusBefore.SendChannelCount}");
|
|
|
|
// 清空通道
|
|
_channelManager.ClearAllChannels();
|
|
|
|
// 检查清空后状态
|
|
var statusAfter = _channelManager.GetStatusInfo();
|
|
_logger.LogInformation($"清空后状态: 发送队列={statusAfter.SendChannelCount}");
|
|
|
|
if (statusAfter.SendChannelCount == 0)
|
|
{
|
|
_logger.LogInformation("批量操作测试通过");
|
|
}
|
|
else
|
|
{
|
|
throw new Exception("清空操作失败");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "批量操作测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 测试错误处理
|
|
/// </summary>
|
|
public async Task TestErrorHandlingAsync()
|
|
{
|
|
_logger.LogInformation("开始测试错误处理");
|
|
|
|
try
|
|
{
|
|
// 测试空消息
|
|
try
|
|
{
|
|
await _channelManager.SendChannel.WriteAsync(null!);
|
|
throw new Exception("应该抛出 ArgumentNullException");
|
|
}
|
|
catch (ArgumentNullException)
|
|
{
|
|
_logger.LogInformation("空消息检查通过");
|
|
}
|
|
|
|
// 测试已释放的通道
|
|
_channelManager.Dispose();
|
|
|
|
try
|
|
{
|
|
await _channelManager.SendChannel.WriteAsync("test");
|
|
throw new Exception("应该抛出 ObjectDisposedException");
|
|
}
|
|
catch (ObjectDisposedException)
|
|
{
|
|
_logger.LogInformation("已释放检查通过");
|
|
}
|
|
|
|
_logger.LogInformation("错误处理测试通过");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "错误处理测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 测试性能
|
|
/// </summary>
|
|
public async Task TestPerformanceAsync()
|
|
{
|
|
_logger.LogInformation("开始测试性能");
|
|
|
|
try
|
|
{
|
|
const int messageCount = 1000;
|
|
const int batchSize = 100;
|
|
|
|
// 清空通道,确保测试环境干净
|
|
_channelManager.ClearAllChannels();
|
|
|
|
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
|
|
var memoryBefore = GC.GetTotalMemory(false);
|
|
|
|
// 批量发送 - 使用更高效的方式
|
|
var sendTasks = new List<ValueTask>(batchSize);
|
|
for (int i = 0; i < messageCount; i++)
|
|
{
|
|
sendTasks.Add(_channelManager.SendChannel.WriteAsync($"Performance test message {i}"));
|
|
|
|
// 每 batchSize 个任务等待一次,避免创建过多任务
|
|
if (sendTasks.Count >= batchSize || i == messageCount - 1)
|
|
{
|
|
// 等待所有 ValueTask 完成
|
|
foreach (var task in sendTasks)
|
|
{
|
|
await task;
|
|
}
|
|
sendTasks.Clear();
|
|
}
|
|
}
|
|
|
|
stopwatch.Stop();
|
|
var sendTime = stopwatch.ElapsedMilliseconds;
|
|
var memoryAfterSend = GC.GetTotalMemory(false);
|
|
|
|
_logger.LogInformation($"发送 {messageCount} 条消息耗时: {sendTime}ms, 内存增长: {memoryAfterSend - memoryBefore} bytes");
|
|
|
|
// 验证发送的消息数量
|
|
var statusAfterSend = _channelManager.GetStatusInfo();
|
|
if (statusAfterSend.SendChannelCount != messageCount)
|
|
{
|
|
throw new Exception($"发送后消息数量不匹配: 期望={messageCount}, 实际={statusAfterSend.SendChannelCount}");
|
|
}
|
|
|
|
// 批量读取 - 使用更高效的方式
|
|
stopwatch.Restart();
|
|
var readCount = 0;
|
|
var readMessages = new List<object>(batchSize);
|
|
|
|
while (readCount < messageCount)
|
|
{
|
|
// 批量读取
|
|
while (readMessages.Count < batchSize && _channelManager.SendChannel.TryRead(out var message))
|
|
{
|
|
readMessages.Add(message);
|
|
readCount++;
|
|
}
|
|
|
|
// 处理批量读取的消息
|
|
if (readMessages.Count > 0)
|
|
{
|
|
// 这里可以添加消息处理逻辑
|
|
readMessages.Clear();
|
|
}
|
|
|
|
// 如果还有消息但当前批次已满,继续下一轮
|
|
if (readCount < messageCount && readMessages.Count == 0)
|
|
{
|
|
await Task.Delay(1); // 短暂延迟,避免CPU占用过高
|
|
}
|
|
}
|
|
|
|
stopwatch.Stop();
|
|
var readTime = stopwatch.ElapsedMilliseconds;
|
|
var memoryAfterRead = GC.GetTotalMemory(false);
|
|
|
|
_logger.LogInformation($"读取 {readCount} 条消息耗时: {readTime}ms, 内存增长: {memoryAfterRead - memoryAfterSend} bytes");
|
|
|
|
// 验证读取后的状态
|
|
var statusAfterRead = _channelManager.GetStatusInfo();
|
|
if (statusAfterRead.SendChannelCount != 0)
|
|
{
|
|
throw new Exception($"读取后通道应该为空: 实际={statusAfterRead.SendChannelCount}");
|
|
}
|
|
|
|
// 性能指标计算
|
|
var totalTime = sendTime + readTime;
|
|
var messagesPerSecond = (messageCount * 1000.0) / totalTime;
|
|
var totalMemoryGrowth = memoryAfterRead - memoryBefore;
|
|
|
|
_logger.LogInformation($"性能测试结果:");
|
|
_logger.LogInformation($" 总耗时: {totalTime}ms");
|
|
_logger.LogInformation($" 发送耗时: {sendTime}ms ({sendTime * 100.0 / totalTime:F1}%)");
|
|
_logger.LogInformation($" 读取耗时: {readTime}ms ({readTime * 100.0 / totalTime:F1}%)");
|
|
_logger.LogInformation($" 消息吞吐量: {messagesPerSecond:F0} 消息/秒");
|
|
_logger.LogInformation($" 总内存增长: {totalMemoryGrowth} bytes");
|
|
_logger.LogInformation($" 平均每条消息内存: {totalMemoryGrowth / (double)messageCount:F1} bytes");
|
|
|
|
if (readCount == messageCount)
|
|
{
|
|
_logger.LogInformation("性能测试通过");
|
|
}
|
|
else
|
|
{
|
|
throw new Exception($"消息数量不匹配: 期望={messageCount}, 实际={readCount}");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "性能测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 测试通道完成
|
|
/// </summary>
|
|
public async Task TestChannelCompletionAsync()
|
|
{
|
|
_logger.LogInformation("开始测试通道完成");
|
|
|
|
try
|
|
{
|
|
// 发送一些消息
|
|
await _channelManager.SendChannel.WriteAsync("Message before completion");
|
|
await _channelManager.ReceiveChannel.WriteAsync("Receive message before completion");
|
|
|
|
// 完成通道
|
|
_channelManager.CompleteAllChannels();
|
|
|
|
// 检查完成状态
|
|
var statusInfo = _channelManager.GetStatusInfo();
|
|
if (statusInfo.SendChannelCompleted && statusInfo.ReceiveChannelCompleted && statusInfo.PriorityChannelCompleted)
|
|
{
|
|
_logger.LogInformation("通道完成测试通过");
|
|
}
|
|
else
|
|
{
|
|
throw new Exception("通道完成状态不正确");
|
|
}
|
|
|
|
// 尝试发送消息到已完成的通道
|
|
try
|
|
{
|
|
await _channelManager.SendChannel.WriteAsync("Message after completion");
|
|
throw new Exception("应该抛出 InvalidOperationException");
|
|
}
|
|
catch (InvalidOperationException)
|
|
{
|
|
_logger.LogInformation("已完成通道写入检查通过");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "通道完成测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 测试 WaitToReadAsync 方法
|
|
/// </summary>
|
|
public async Task TestWaitToReadAsyncAsync()
|
|
{
|
|
_logger.LogInformation("开始测试 WaitToReadAsync 方法");
|
|
|
|
try
|
|
{
|
|
// 测试空通道的等待
|
|
var waitTask = _channelManager.SendChannel.WaitToReadAsync(CancellationToken.None);
|
|
|
|
// 等待一小段时间,应该返回 false(没有数据)
|
|
await Task.Delay(100);
|
|
|
|
if (waitTask.IsCompleted)
|
|
{
|
|
var hasData = await waitTask;
|
|
if (!hasData)
|
|
{
|
|
_logger.LogInformation("空通道等待测试通过");
|
|
}
|
|
else
|
|
{
|
|
throw new Exception("空通道应该返回 false");
|
|
}
|
|
}
|
|
|
|
// 测试有数据的等待
|
|
var sendTask = _channelManager.SendChannel.WriteAsync("Test message for WaitToReadAsync");
|
|
var waitTask2 = _channelManager.SendChannel.WaitToReadAsync(CancellationToken.None);
|
|
|
|
await sendTask;
|
|
var hasData2 = await waitTask2;
|
|
|
|
if (hasData2)
|
|
{
|
|
_logger.LogInformation("有数据通道等待测试通过");
|
|
}
|
|
else
|
|
{
|
|
throw new Exception("有数据的通道应该返回 true");
|
|
}
|
|
|
|
// 清理
|
|
_channelManager.SendChannel.TryRead(out _);
|
|
|
|
_logger.LogInformation("WaitToReadAsync 测试通过");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "WaitToReadAsync 测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 运行所有测试
|
|
/// </summary>
|
|
public async Task RunAllTestsAsync()
|
|
{
|
|
_logger.LogInformation("开始运行所有测试");
|
|
|
|
try
|
|
{
|
|
await TestBasicFunctionalityAsync();
|
|
await TestAsyncOperationsAsync();
|
|
await TestWaitToReadAsyncAsync();
|
|
await TestBatchOperationsAsync();
|
|
await TestPerformanceAsync();
|
|
await TestChannelCompletionAsync();
|
|
// 注意:TestErrorHandlingAsync 会释放通道管理器,所以放在最后
|
|
await TestErrorHandlingAsync();
|
|
|
|
_logger.LogInformation("所有测试通过!");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "测试失败");
|
|
throw;
|
|
}
|
|
}
|
|
}
|