You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

787 lines
24 KiB

# CoreAgent.WebSocketTransport 项目修改记录
## 2024-12-19 - 项目结构说明文档生成
### 新增文件
- **项目结构说明.md**: 详细描述了 CoreAgent.WebSocketTransport 项目的整体架构、组件说明、数据流转、核心特性等
### 文档内容概览
1. **项目概述**: 基于 .NET 8.0 的 WebSocket 传输层组件,主要负责转发协议数据给上层应用
2. **核心设计理念**: 单一职责原则、依赖注入、异步编程
3. **项目结构**: 详细的分层架构说明(Interfaces、Services、Models、Middleware、Extensions、Examples)
4. **组件说明**: 每个接口和实现类的详细职责和功能说明
5. **数据流转架构**: 从上层应用到 WebSocket 服务器的完整数据流
6. **核心特性**: 高性能设计、可靠性保障、可扩展性、易用性
7. **使用方式**: 服务注册和基本使用的代码示例
8. **技术栈**: 使用的 .NET 技术栈说明
### 文档价值
- 为开发人员提供完整的项目理解
- 便于新成员快速上手
- 记录设计决策和架构思路
- 提供最佳实践参考
## 2024-12-19 - 项目设计分析
### 设计优点
1. **良好的架构分层**
- 清晰的接口定义(Interfaces 目录)
- 实现与接口分离(Services 目录)
- 中间件模式支持扩展性(Middleware 目录)
- 配置模型独立(Models 目录)
2. **依赖注入支持**
- 完整的 DI 容器集成
- 服务注册扩展方法
- 配置绑定支持
3. **异步编程模式**
- 全面使用 async/await
- 支持 CancellationToken
- 使用 System.Threading.Channels 进行消息队列
4. **中间件架构**
- 支持消息处理管道
- 可扩展的消息处理逻辑
- 支持发送和接收消息的独立处理
5. **连接管理**
- 自动重连机制
- 心跳检测
- 连接状态管理
### 设计缺陷和问题
#### 1. **接口设计问题**
**问题:** `IWebSocketTransport` 接口过于简单,缺少关键功能
```csharp
// 当前接口只提供连接管理,缺少消息发送/接收功能
public interface IWebSocketTransport : IDisposable
{
bool IsConnected { get; }
DateTime? LastHeartbeat { get; }
Task ConnectAsync(CancellationToken cancellationToken = default);
Task CloseAsync(CancellationToken cancellationToken = default);
}
```
**建议:** 添加消息发送/接收方法
```csharp
public interface IWebSocketTransport : IDisposable
{
bool IsConnected { get; }
DateTime? LastHeartbeat { get; }
IMessageChannelManager ChannelManager { get; }
Task ConnectAsync(CancellationToken cancellationToken = default);
Task CloseAsync(CancellationToken cancellationToken = default);
Task SendAsync<T>(T message, CancellationToken cancellationToken = default);
Task<T> ReceiveAsync<T>(CancellationToken cancellationToken = default);
}
```
#### 2. **消息类型安全问题**
**问题:** 所有消息通道都使用 `object` 类型,缺乏类型安全
```csharp
public interface IMessageChannelManager
{
IMessageChannel<object> SendChannel { get; }
IMessageChannel<object> ReceiveChannel { get; }
IMessageChannel<object> PriorityChannel { get; }
}
```
**建议:** 使用泛型或强类型消息
```csharp
public interface IMessageChannelManager
{
IMessageChannel<T> GetChannel<T>(string channelName = "default");
IMessageChannel<T> GetPriorityChannel<T>();
}
```
#### 3. **错误处理不完善**
**问题:** 缺少统一的错误处理机制
- 重连失败后没有通知上层应用
- 消息发送失败没有重试机制
- 异常信息不够详细
**建议:** 添加事件机制和错误处理
```csharp
public interface IWebSocketTransport
{
event EventHandler<ConnectionEventArgs> ConnectionChanged;
event EventHandler<MessageEventArgs> MessageReceived;
event EventHandler<ErrorEventArgs> ErrorOccurred;
}
```
#### 4. **配置验证不足**
**问题:** 配置验证只在模型层面,运行时验证不够
```csharp
// 当前只有数据注解验证
[Required(ErrorMessage = "WebSocket URL 不能为空")]
[Url(ErrorMessage = "WebSocket URL 格式不正确")]
public string Url { get; set; } = "wss://example.com/ws";
```
**建议:** 添加运行时配置验证
```csharp
public class WebSocketConfigValidator
{
public static ValidationResult Validate(WebSocketConfig config)
{
// 运行时验证逻辑
}
}
```
#### 5. **资源管理问题** ✅ 已修复
**问题:** 资源释放不够彻底
- 后台任务可能没有正确等待完成
- 内存池使用不当
- 连接关闭时没有清理所有资源
**修复内容:**
1. **添加了 _disposed 标志**
- 防止重复释放资源
- 确保 Dispose 模式正确实现
2. **改进了任务等待机制**
- 详细记录每个任务的状态
- 添加超时处理和结果检查
- 提供任务完成状态的详细日志
3. **改进了信号量处理**
- 添加超时处理
- 防止死锁情况
- 详细的状态跟踪
4. **增强了异常处理**
- 分离托管资源释放的异常处理
- 提供更详细的错误信息
- 确保资源释放的可靠性
5. **添加了详细的日志记录**
- 资源释放过程的完整跟踪
- 任务状态和完成情况记录
- 异常情况的详细记录
**修复后的 Dispose 模式:**
```csharp
private bool _disposed = false;
public void Dispose()
{
if (_disposed) return;
_logger?.LogInformation("开始释放 WebSocket 传输资源");
try
{
// 取消所有操作
_cancellationTokenSource?.Cancel();
_logger?.LogDebug("已取消所有操作令牌");
// 等待信号量释放
if (_connectionSemaphore != null)
{
try
{
_connectionSemaphore.Wait(TimeSpan.FromSeconds(5));
_logger?.LogDebug("已获取连接信号量");
}
catch (TimeoutException)
{
_logger?.LogWarning("等待连接信号量超时");
}
}
// 收集需要等待的任务
var tasks = new List<Task>();
if (_sendTask != null && !_sendTask.IsCompleted)
{
tasks.Add(_sendTask);
_logger?.LogDebug("添加发送任务到等待列表");
}
// ... 其他任务处理
// 等待所有任务完成
if (tasks.Count > 0)
{
_logger?.LogInformation("等待 {TaskCount} 个后台任务完成", tasks.Count);
var waitResult = Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
if (waitResult)
{
_logger?.LogInformation("所有后台任务已成功完成");
}
else
{
_logger?.LogWarning("部分后台任务未在超时时间内完成");
}
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "释放资源过程中发生异常");
}
finally
{
try
{
// 释放托管资源
_cancellationTokenSource?.Dispose();
_connectionSemaphore?.Dispose();
_logger?.LogDebug("已释放托管资源");
}
catch (Exception ex)
{
_logger?.LogError(ex, "释放托管资源时发生异常");
}
_disposed = true;
GC.SuppressFinalize(this);
_logger?.LogInformation("WebSocket 传输资源释放完成");
}
}
```
#### 6. **性能问题**
**问题:** 存在性能瓶颈
- 消息序列化每次都创建新实例
- 缓冲区大小固定(4096字节)
- 没有消息压缩支持
- 心跳间隔固定(30秒)
**建议:** 性能优化
```csharp
// 使用对象池
private readonly ObjectPool<byte[]> _bufferPool;
// 动态缓冲区大小
private int _bufferSize = 4096;
// 消息压缩
public interface IMessageCompressor
{
byte[] Compress(byte[] data);
byte[] Decompress(byte[] data);
}
```
#### 7. **测试覆盖不足**
**问题:** 缺少单元测试和集成测试
- 只有示例代码,没有正式测试
- 缺少边界条件测试
- 缺少并发测试
**建议:** 添加完整的测试套件
```csharp
[TestClass]
public class WebSocketTransportTests
{
[TestMethod]
public async Task ConnectAsync_ValidUrl_ShouldConnect()
{
// 测试连接功能
}
[TestMethod]
public async Task SendAsync_Message_ShouldBeReceived()
{
// 测试消息发送接收
}
[TestMethod]
public async Task Reconnect_ConnectionLost_ShouldReconnect()
{
// 测试重连功能
}
}
```
#### 8. **日志记录不够详细** ✅ 已修复
**问题:** 日志信息不够详细,难以调试
- 缺少关键操作的日志
- 没有性能指标日志
- 错误日志信息不够详细
**修复内容:**
1. **添加了详细的连接日志**
- 连接开始、成功、失败都有详细日志
- 包含配置信息和超时时间
- 后台任务启动状态跟踪
2. **添加了性能监控日志**
- 消息发送/接收耗时统计
- 使用 Stopwatch 精确计时
- 消息类型和大小记录
3. **添加了结构化日志**
- 使用参数化日志记录
- 包含消息计数和统计信息
- 中间件处理过程跟踪
4. **改进了错误日志**
- 异常信息更详细
- 包含上下文信息
- 操作状态和结果记录
5. **添加了调试级别日志**
- Trace 级别用于详细跟踪
- Debug 级别用于关键操作
- 资源管理状态跟踪
**示例改进:**
```csharp
// 连接日志
_logger.LogInformation("开始连接 WebSocket 服务器: {Url}, 超时时间: {TimeoutMs}ms", _config.Url, _config.TimeoutMs);
// 性能日志
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
// ... 处理逻辑
_logger.LogDebug("消息发送成功: {MessageType}, 耗时: {ElapsedMs}ms", messageType, stopwatch.ElapsedMilliseconds);
// 统计日志
_logger.LogInformation("发送循环正常结束,共处理 {MessageCount} 条普通消息,{PriorityCount} 条优先级消息",
messageCount, priorityMessageCount);
```
#### 9. **缺少监控和指标**
**问题:** 没有性能监控和健康检查
- 缺少连接状态监控
- 没有消息吞吐量统计
- 缺少错误率统计
**建议:** 添加监控指标
```csharp
public interface IWebSocketMetrics
{
long MessagesSent { get; }
long MessagesReceived { get; }
long Errors { get; }
TimeSpan AverageLatency { get; }
double SuccessRate { get; }
}
```
#### 10. **安全性考虑不足**
**问题:** 缺少安全相关功能
- 没有消息加密
- 没有身份验证
- 没有消息签名验证
**建议:** 添加安全功能
```csharp
public interface IMessageSecurity
{
byte[] Encrypt(byte[] data);
byte[] Decrypt(byte[] data);
bool VerifySignature(byte[] data, byte[] signature);
}
```
### 改进建议优先级
1. **高优先级**:错误处理、类型安全、配置验证
2. **中优先级**:性能优化、监控指标
3. **低优先级**:安全功能、测试覆盖
**已修复项目:**
- ✅ 资源管理问题(Dispose 模式)
- ✅ 日志记录不够详细
### 总结
CoreAgent.WebSocketTransport 项目整体架构设计良好,采用了现代 .NET 开发模式,但在接口设计、类型安全、错误处理等方面存在改进空间。建议按照优先级逐步改进,特别是完善接口设计和错误处理机制。
## 2024-12-19 - WebSocket关闭处理严重Bug修复
### 问题描述
**严重Bug:** `ReceiveLoopAsync` 方法中处理 `WebSocketMessageType.Close` 消息时存在严重问题,导致无法二次连接。
**具体问题:**
1. 当收到 `WebSocketMessageType.Close` 消息时,WebSocket 连接会进入 `CloseReceived` 状态
2.`CloseReceived` 状态下,无法再次调用 `ConnectAsync` 方法
3. 原有的处理逻辑只是简单 `break`,没有正确处理连接状态
4. 导致后续重连失败,连接无法恢复
### 修复内容
#### 1. **WebSocketConnection 类改进** ✅ 已修复
**问题:** 缺少对 `CloseReceived` 状态的处理
**修复:** 添加连接状态检查和强制关闭功能
```csharp
public class WebSocketConnection : IWebSocketConnection
{
private ClientWebSocket _webSocket;
private readonly ILogger<WebSocketConnection> _logger;
private readonly object _lock = new object();
public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken = default)
{
lock (_lock)
{
// 如果连接已关闭或处于CloseReceived状态,需要重新创建WebSocket实例
if (_webSocket.State == WebSocketState.Closed ||
_webSocket.State == WebSocketState.CloseReceived ||
_webSocket.State == WebSocketState.CloseSent)
{
_logger.LogInformation("WebSocket 处于关闭状态,重新创建连接实例");
_webSocket?.Dispose();
_webSocket = new ClientWebSocket();
}
if (_webSocket.State == WebSocketState.Open)
{
_logger.LogInformation("WebSocket 已连接");
return;
}
}
_logger.LogInformation("正在连接 WebSocket 服务器: {Uri}", uri);
await _webSocket.ConnectAsync(uri, cancellationToken);
_logger.LogInformation("WebSocket 连接成功");
}
/// <summary>
/// 强制关闭连接并重新创建WebSocket实例
/// </summary>
public void ForceClose()
{
lock (_lock)
{
_logger.LogInformation("强制关闭 WebSocket 连接并重新创建实例");
_webSocket?.Dispose();
_webSocket = new ClientWebSocket();
}
}
}
```
#### 2. **IWebSocketConnection 接口扩展** ✅ 已修复
**添加:** `ForceClose` 方法到接口定义
```csharp
public interface IWebSocketConnection : IDisposable
{
// ... 现有方法
/// <summary>
/// 强制关闭连接并重新创建WebSocket实例
/// 用于处理CloseReceived状态后无法重连的问题
/// </summary>
void ForceClose();
}
```
#### 3. **ReceiveLoopAsync 方法修复** ✅ 已修复
**问题:** Close 消息处理不当
**修复:** 正确处理 Close 消息并触发重连
```csharp
else if (result.MessageType == WebSocketMessageType.Close)
{
_logger.LogInformation("收到 WebSocket 关闭消息,准备处理连接关闭");
// 收到关闭消息时,需要强制关闭连接并重新创建WebSocket实例
// 这样可以确保后续能够重新连接
_connection.ForceClose();
_isConnected = false;
_logger.LogInformation("WebSocket 连接已强制关闭,准备触发重连");
// 确保在触发重连之前,当前接收循环能够正常退出
// 重连任务会在后台启动,不会阻塞当前循环的退出
TriggerReconnect();
break;
}
```
#### 4. **CloseAsync 方法改进** ✅ 已修复
**问题:** 关闭时没有处理重连任务和异常情况
**修复:** 完善关闭逻辑
```csharp
// 等待任务完成
var closeTasks = new List<Task>();
if (_sendTask != null && !_sendTask.IsCompleted)
{
closeTasks.Add(_sendTask);
_logger.LogDebug("等待发送任务完成");
}
if (_receiveTask != null && !_receiveTask.IsCompleted)
{
closeTasks.Add(_receiveTask);
_logger.LogDebug("等待接收任务完成");
}
if (_heartbeatTask != null && !_heartbeatTask.IsCompleted)
{
closeTasks.Add(_heartbeatTask);
_logger.LogDebug("等待心跳任务完成");
}
if (_reconnectTask != null && !_reconnectTask.IsCompleted)
{
closeTasks.Add(_reconnectTask);
_logger.LogDebug("等待重连任务完成");
}
// 关闭连接
try
{
await _connection.CloseAsync(WebSocketCloseStatus.NormalClosure, "正常关闭", cancellationToken);
_logger.LogDebug("WebSocket 连接已正常关闭");
}
catch (Exception ex)
{
_logger.LogWarning(ex, "正常关闭连接失败,强制关闭连接");
_connection.ForceClose();
}
```
#### 5. **重连逻辑改进** ✅ 已修复
**问题:** 重连任务管理不够严谨
**修复:** 完善重连任务管理和状态处理
```csharp
private void TriggerReconnect()
{
lock (_reconnectLock)
{
if (_reconnectTask != null && !_reconnectTask.IsCompleted)
{
_logger.LogDebug("重连任务已在运行,跳过重复触发");
return; // 重连任务已在运行
}
_logger.LogInformation("启动重连任务");
_reconnectTask = Task.Run(() => ReconnectLoopAsync(_cancellationTokenSource.Token));
_logger.LogDebug("重连任务已启动: {TaskId}", _reconnectTask.Id);
}
}
private async Task ReconnectLoopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("重连循环开始,当前重连次数: {Attempts}", _reconnectAttempts);
_isConnected = false;
while (_reconnectAttempts < _config.MaxReconnectAttempts && !cancellationToken.IsCancellationRequested)
{
_reconnectAttempts++;
var delaySeconds = Math.Min(Math.Pow(2, _reconnectAttempts - 1), 30);
var delay = TimeSpan.FromSeconds(delaySeconds);
_logger.LogWarning("WebSocket 连接断开,{DelaySeconds}秒后进行第{Attempt}次重连",
delaySeconds, _reconnectAttempts);
await Task.Delay(delay, cancellationToken);
try
{
_logger.LogInformation("开始第{Attempt}次重连尝试", _reconnectAttempts);
await ConnectInternalAsync(cancellationToken);
_logger.LogInformation("WebSocket 重连成功,重连次数: {Attempts}", _reconnectAttempts);
return;
}
catch (Exception ex)
{
_logger.LogError(ex, "WebSocket 重连失败,尝试次数: {Attempt}", _reconnectAttempts);
// 重连失败时,确保连接状态正确
_isConnected = false;
_connection.ForceClose();
}
}
_logger.LogError("WebSocket 重连失败,已达到最大尝试次数: {MaxAttempts}", _config.MaxReconnectAttempts);
_isConnected = false;
}
```
#### 6. **连接状态管理改进** ✅ 已修复
**问题:** 连接失败时状态没有正确重置
**修复:** 确保连接状态的一致性
```csharp
private async Task ConnectInternalAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("正在连接 WebSocket 服务器: {Url}, 超时时间: {TimeoutMs}ms", _config.Url, _config.TimeoutMs);
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(_config.TimeoutMs);
try
{
await _connection.ConnectAsync(new Uri(_config.Url), timeoutCts.Token);
_logger.LogDebug("WebSocket 连接建立成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "WebSocket 连接建立失败: {Url}", _config.Url);
// 连接失败时,确保状态正确
_isConnected = false;
_connection.ForceClose();
throw;
}
_isConnected = true;
_reconnectAttempts = 0;
UpdateHeartbeat();
_logger.LogDebug("连接状态已更新,重连次数重置为 0");
// 启动后台任务
_logger.LogDebug("启动后台任务");
_sendTask = Task.Run(() => SendLoopAsync(_cancellationTokenSource.Token));
_receiveTask = Task.Run(() => ReceiveLoopAsync(_cancellationTokenSource.Token));
_heartbeatTask = Task.Run(() => HeartbeatLoopAsync(_cancellationTokenSource.Token));
_logger.LogDebug("后台任务启动完成: 发送={SendTaskId}, 接收={ReceiveTaskId}, 心跳={HeartbeatTaskId}",
_sendTask?.Id, _receiveTask?.Id, _heartbeatTask?.Id);
_logger.LogInformation("WebSocket 连接成功建立,所有后台任务已启动");
}
```
### 修复效果
1. **解决了 CloseReceived 状态问题**
- 收到 Close 消息时正确强制关闭连接
- 重新创建 WebSocket 实例,确保可以重连
2. **改进了连接状态管理**
- 连接失败时正确重置状态
- 重连失败时确保状态一致性
3. **完善了任务管理**
- 关闭时等待所有后台任务完成
- 重连任务管理更加严谨
4. **增强了错误处理**
- 连接关闭异常时的降级处理
- 更详细的日志记录
5. **提高了系统稳定性**
- 支持多次重连
- 连接状态更加可靠
### 测试建议
1. **连接断开测试**
- 模拟服务器主动关闭连接
- 验证重连机制是否正常工作
2. **网络异常测试**
- 模拟网络中断
- 验证连接恢复能力
3. **多次重连测试**
- 连续多次断开连接
- 验证重连次数限制和延迟策略
4. **并发测试**
- 多线程环境下的连接管理
- 验证线程安全性
### 总结
此次修复解决了 WebSocket 连接管理中的严重 Bug,确保了连接在收到 Close 消息后能够正确重连。修复涉及多个层面的改进,包括连接状态管理、任务管理、错误处理等,大大提高了系统的稳定性和可靠性。
## 2024年修改记录
### WebSocketTransportExtensions 清理优化
**修改时间**: 2024年
**修改文件**:
- `CoreAgent.WebSocketTransport/Extensions/WebSocketTransportExtensions.cs`
**优化内容**:
1. **代码重构**
- 消除重复代码,将公共逻辑提取到私有方法中
- 添加参数验证,确保输入参数不为空
- 改进代码结构和可读性
2. **方法拆分**
- `RegisterCoreServices()` - 注册核心服务组件
- `RegisterDefaultMiddleware()` - 注册默认中间件
- 提高代码的可维护性和可测试性
3. **错误处理改进**
- 添加 `ArgumentNullException` 检查
- 确保配置对象正确获取
- 改进依赖注入的健壮性
4. **配置管理优化**
- 统一使用 `IOptions<WebSocketConfig>` 获取配置
- 消除配置获取的重复代码
- 确保配置一致性
5. **移除冗余方法**
- 移除委托配置重载方法 `AddWebSocketTransport(Action<WebSocketConfig>)`
- 简化 API 设计,减少维护成本
- 统一使用基于 `IConfiguration` 的配置方式
### Startup 集成实现
**修改时间**: 2024年
**修改文件**:
- `CoreAgent.API/Startup.cs`
- `CoreAgent.API/Configurations/websocket.json`
- `CoreAgent.API/Configurations/websocket.Development.json`
**集成内容**:
1. **Startup.cs 修改**
- 添加 `using CoreAgent.WebSocketTransport.Extensions;`
-`ConfigureServices` 中注册 WebSocket 传输服务
- 在配置加载中添加 WebSocket 配置文件
2. **配置文件创建**
- `websocket.json` - 生产环境配置
- `websocket.Development.json` - 开发环境配置
- 包含所有 WebSocketConfig 类的属性配置项
3. **配置项说明**
- `Url`: WebSocket 服务器地址
- `TimeoutMs`: 连接超时时间(毫秒)
- `BatchTimeoutMs`: 批量发送时间窗口(毫秒)
- `MaxBatchSize`: 最大批量大小(条消息)
- `MaxReconnectAttempts`: 最大重连尝试次数
- `QueueCapacity`: 消息队列容量
- `CacheTtlMinutes`: 缓存消息 TTL(分钟)
4. **环境差异配置**
- 开发环境:较小的队列容量和批量大小,较短的超时时间,较少的重连次数
- 生产环境:较大的队列容量和批量大小,较长的超时时间,较多的重连次数
**设计优势**:
- 代码结构清晰,易于维护
- 配置灵活,支持不同环境
- 错误处理完善,提高系统健壮性
- 遵循依赖注入最佳实践
- 支持热重载配置
- API 设计简洁,减少冗余方法
- 配置文件与模型类完全匹配,确保配置绑定正确
**影响范围**:
- WebSocket 传输服务注册
- 配置管理
- 中间件集成
- 应用程序启动流程