You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

25 KiB

CoreAgent.WebSocketTransport 项目修改记录

2024-12-19 - 项目结构说明文档生成

新增文件

  • 项目结构说明.md: 详细描述了 CoreAgent.WebSocketTransport 项目的整体架构、组件说明、数据流转、核心特性等

文档内容概览

  1. 项目概述: 基于 .NET 8.0 的 WebSocket 传输层组件,主要负责转发协议数据给上层应用
  2. 核心设计理念: 单一职责原则、依赖注入、异步编程
  3. 项目结构: 详细的分层架构说明(Interfaces、Services、Models、Middleware、Extensions、Examples)
  4. 组件说明: 每个接口和实现类的详细职责和功能说明
  5. 数据流转架构: 从上层应用到 WebSocket 服务器的完整数据流
  6. 核心特性: 高性能设计、可靠性保障、可扩展性、易用性
  7. 使用方式: 服务注册和基本使用的代码示例
  8. 技术栈: 使用的 .NET 技术栈说明

文档价值

  • 为开发人员提供完整的项目理解
  • 便于新成员快速上手
  • 记录设计决策和架构思路
  • 提供最佳实践参考

2024-12-19 - 项目设计分析

设计优点

  1. 良好的架构分层

    • 清晰的接口定义(Interfaces 目录)
    • 实现与接口分离(Services 目录)
    • 中间件模式支持扩展性(Middleware 目录)
    • 配置模型独立(Models 目录)
  2. 依赖注入支持

    • 完整的 DI 容器集成
    • 服务注册扩展方法
    • 配置绑定支持
  3. 异步编程模式

    • 全面使用 async/await
    • 支持 CancellationToken
    • 使用 System.Threading.Channels 进行消息队列
  4. 中间件架构

    • 支持消息处理管道
    • 可扩展的消息处理逻辑
    • 支持发送和接收消息的独立处理
  5. 连接管理

    • 自动重连机制
    • 心跳检测
    • 连接状态管理

设计缺陷和问题

1. 接口设计问题

问题: IWebSocketTransport 接口过于简单,缺少关键功能

// 当前接口只提供连接管理,缺少消息发送/接收功能
public interface IWebSocketTransport : IDisposable
{
    bool IsConnected { get; }
    DateTime? LastHeartbeat { get; }
    Task ConnectAsync(CancellationToken cancellationToken = default);
    Task CloseAsync(CancellationToken cancellationToken = default);
}

建议: 添加消息发送/接收方法

public interface IWebSocketTransport : IDisposable
{
    bool IsConnected { get; }
    DateTime? LastHeartbeat { get; }
    IMessageChannelManager ChannelManager { get; }
    
    Task ConnectAsync(CancellationToken cancellationToken = default);
    Task CloseAsync(CancellationToken cancellationToken = default);
    Task SendAsync<T>(T message, CancellationToken cancellationToken = default);
    Task<T> ReceiveAsync<T>(CancellationToken cancellationToken = default);
}

2. 消息类型安全问题

问题: 所有消息通道都使用 object 类型,缺乏类型安全

public interface IMessageChannelManager
{
    IMessageChannel<object> SendChannel { get; }
    IMessageChannel<object> ReceiveChannel { get; }
    IMessageChannel<object> PriorityChannel { get; }
}

建议: 使用泛型或强类型消息

public interface IMessageChannelManager
{
    IMessageChannel<T> GetChannel<T>(string channelName = "default");
    IMessageChannel<T> GetPriorityChannel<T>();
}

3. 错误处理不完善

问题: 缺少统一的错误处理机制

  • 重连失败后没有通知上层应用
  • 消息发送失败没有重试机制
  • 异常信息不够详细

建议: 添加事件机制和错误处理

public interface IWebSocketTransport
{
    event EventHandler<ConnectionEventArgs> ConnectionChanged;
    event EventHandler<MessageEventArgs> MessageReceived;
    event EventHandler<ErrorEventArgs> ErrorOccurred;
}

4. 配置验证不足

问题: 配置验证只在模型层面,运行时验证不够

// 当前只有数据注解验证
[Required(ErrorMessage = "WebSocket URL 不能为空")]
[Url(ErrorMessage = "WebSocket URL 格式不正确")]
public string Url { get; set; } = "wss://example.com/ws";

建议: 添加运行时配置验证

public class WebSocketConfigValidator
{
    public static ValidationResult Validate(WebSocketConfig config)
    {
        // 运行时验证逻辑
    }
}

5. 资源管理问题 已修复

问题: 资源释放不够彻底

  • 后台任务可能没有正确等待完成
  • 内存池使用不当
  • 连接关闭时没有清理所有资源

修复内容:

  1. 添加了 _disposed 标志

    • 防止重复释放资源
    • 确保 Dispose 模式正确实现
  2. 改进了任务等待机制

    • 详细记录每个任务的状态
    • 添加超时处理和结果检查
    • 提供任务完成状态的详细日志
  3. 改进了信号量处理

    • 添加超时处理
    • 防止死锁情况
    • 详细的状态跟踪
  4. 增强了异常处理

    • 分离托管资源释放的异常处理
    • 提供更详细的错误信息
    • 确保资源释放的可靠性
  5. 添加了详细的日志记录

    • 资源释放过程的完整跟踪
    • 任务状态和完成情况记录
    • 异常情况的详细记录

修复后的 Dispose 模式:

private bool _disposed = false;

public void Dispose()
{
    if (_disposed) return;
    
    _logger?.LogInformation("开始释放 WebSocket 传输资源");
    
    try
    {
        // 取消所有操作
        _cancellationTokenSource?.Cancel();
        _logger?.LogDebug("已取消所有操作令牌");
        
        // 等待信号量释放
        if (_connectionSemaphore != null)
        {
            try
            {
                _connectionSemaphore.Wait(TimeSpan.FromSeconds(5));
                _logger?.LogDebug("已获取连接信号量");
            }
            catch (TimeoutException)
            {
                _logger?.LogWarning("等待连接信号量超时");
            }
        }
        
        // 收集需要等待的任务
        var tasks = new List<Task>();
        if (_sendTask != null && !_sendTask.IsCompleted) 
        {
            tasks.Add(_sendTask);
            _logger?.LogDebug("添加发送任务到等待列表");
        }
        // ... 其他任务处理
        
        // 等待所有任务完成
        if (tasks.Count > 0)
        {
            _logger?.LogInformation("等待 {TaskCount} 个后台任务完成", tasks.Count);
            var waitResult = Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
            if (waitResult)
            {
                _logger?.LogInformation("所有后台任务已成功完成");
            }
            else
            {
                _logger?.LogWarning("部分后台任务未在超时时间内完成");
            }
        }
    }
    catch (Exception ex)
    {
        _logger?.LogError(ex, "释放资源过程中发生异常");
    }
    finally
    {
        try
        {
            // 释放托管资源
            _cancellationTokenSource?.Dispose();
            _connectionSemaphore?.Dispose();
            _logger?.LogDebug("已释放托管资源");
        }
        catch (Exception ex)
        {
            _logger?.LogError(ex, "释放托管资源时发生异常");
        }
        
        _disposed = true;
        GC.SuppressFinalize(this);
        _logger?.LogInformation("WebSocket 传输资源释放完成");
    }
}

6. 性能问题

问题: 存在性能瓶颈

  • 消息序列化每次都创建新实例
  • 缓冲区大小固定(4096字节)
  • 没有消息压缩支持
  • 心跳间隔固定(30秒)

建议: 性能优化

// 使用对象池
private readonly ObjectPool<byte[]> _bufferPool;

// 动态缓冲区大小
private int _bufferSize = 4096;

// 消息压缩
public interface IMessageCompressor
{
    byte[] Compress(byte[] data);
    byte[] Decompress(byte[] data);
}

7. 测试覆盖不足

问题: 缺少单元测试和集成测试

  • 只有示例代码,没有正式测试
  • 缺少边界条件测试
  • 缺少并发测试

建议: 添加完整的测试套件

[TestClass]
public class WebSocketTransportTests
{
    [TestMethod]
    public async Task ConnectAsync_ValidUrl_ShouldConnect()
    {
        // 测试连接功能
    }
    
    [TestMethod]
    public async Task SendAsync_Message_ShouldBeReceived()
    {
        // 测试消息发送接收
    }
    
    [TestMethod]
    public async Task Reconnect_ConnectionLost_ShouldReconnect()
    {
        // 测试重连功能
    }
}

8. 日志记录不够详细 已修复

问题: 日志信息不够详细,难以调试

  • 缺少关键操作的日志
  • 没有性能指标日志
  • 错误日志信息不够详细

修复内容:

  1. 添加了详细的连接日志

    • 连接开始、成功、失败都有详细日志
    • 包含配置信息和超时时间
    • 后台任务启动状态跟踪
  2. 添加了性能监控日志

    • 消息发送/接收耗时统计
    • 使用 Stopwatch 精确计时
    • 消息类型和大小记录
  3. 添加了结构化日志

    • 使用参数化日志记录
    • 包含消息计数和统计信息
    • 中间件处理过程跟踪
  4. 改进了错误日志

    • 异常信息更详细
    • 包含上下文信息
    • 操作状态和结果记录
  5. 添加了调试级别日志

    • Trace 级别用于详细跟踪
    • Debug 级别用于关键操作
    • 资源管理状态跟踪

示例改进:

// 连接日志
_logger.LogInformation("开始连接 WebSocket 服务器: {Url}, 超时时间: {TimeoutMs}ms", _config.Url, _config.TimeoutMs);

// 性能日志
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
// ... 处理逻辑
_logger.LogDebug("消息发送成功: {MessageType}, 耗时: {ElapsedMs}ms", messageType, stopwatch.ElapsedMilliseconds);

// 统计日志
_logger.LogInformation("发送循环正常结束,共处理 {MessageCount} 条普通消息,{PriorityCount} 条优先级消息", 
    messageCount, priorityMessageCount);

9. 缺少监控和指标

问题: 没有性能监控和健康检查

  • 缺少连接状态监控
  • 没有消息吞吐量统计
  • 缺少错误率统计

建议: 添加监控指标

public interface IWebSocketMetrics
{
    long MessagesSent { get; }
    long MessagesReceived { get; }
    long Errors { get; }
    TimeSpan AverageLatency { get; }
    double SuccessRate { get; }
}

10. 安全性考虑不足

问题: 缺少安全相关功能

  • 没有消息加密
  • 没有身份验证
  • 没有消息签名验证

建议: 添加安全功能

public interface IMessageSecurity
{
    byte[] Encrypt(byte[] data);
    byte[] Decrypt(byte[] data);
    bool VerifySignature(byte[] data, byte[] signature);
}

改进建议优先级

  1. 高优先级:错误处理、类型安全、配置验证
  2. 中优先级:性能优化、监控指标
  3. 低优先级:安全功能、测试覆盖

已修复项目:

  • 资源管理问题(Dispose 模式)
  • 日志记录不够详细

总结

CoreAgent.WebSocketTransport 项目整体架构设计良好,采用了现代 .NET 开发模式,但在接口设计、类型安全、错误处理等方面存在改进空间。建议按照优先级逐步改进,特别是完善接口设计和错误处理机制。

2024-12-19 - WebSocket关闭处理严重Bug修复

问题描述

严重Bug: ReceiveLoopAsync 方法中处理 WebSocketMessageType.Close 消息时存在严重问题,导致无法二次连接。

具体问题:

  1. 当收到 WebSocketMessageType.Close 消息时,WebSocket 连接会进入 CloseReceived 状态
  2. CloseReceived 状态下,无法再次调用 ConnectAsync 方法
  3. 原有的处理逻辑只是简单 break,没有正确处理连接状态
  4. 导致后续重连失败,连接无法恢复

修复内容

1. WebSocketConnection 类改进 已修复

问题: 缺少对 CloseReceived 状态的处理 修复: 添加连接状态检查和强制关闭功能

public class WebSocketConnection : IWebSocketConnection
{
    private ClientWebSocket _webSocket;
    private readonly ILogger<WebSocketConnection> _logger;
    private readonly object _lock = new object();

    public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken = default)
    {
        lock (_lock)
        {
            // 如果连接已关闭或处于CloseReceived状态,需要重新创建WebSocket实例
            if (_webSocket.State == WebSocketState.Closed || 
                _webSocket.State == WebSocketState.CloseReceived ||
                _webSocket.State == WebSocketState.CloseSent)
            {
                _logger.LogInformation("WebSocket 处于关闭状态,重新创建连接实例");
                _webSocket?.Dispose();
                _webSocket = new ClientWebSocket();
            }
            
            if (_webSocket.State == WebSocketState.Open)
            {
                _logger.LogInformation("WebSocket 已连接");
                return;
            }
        }

        _logger.LogInformation("正在连接 WebSocket 服务器: {Uri}", uri);
        await _webSocket.ConnectAsync(uri, cancellationToken);
        _logger.LogInformation("WebSocket 连接成功");
    }

    /// <summary>
    /// 强制关闭连接并重新创建WebSocket实例
    /// </summary>
    public void ForceClose()
    {
        lock (_lock)
        {
            _logger.LogInformation("强制关闭 WebSocket 连接并重新创建实例");
            _webSocket?.Dispose();
            _webSocket = new ClientWebSocket();
        }
    }
}

2. IWebSocketConnection 接口扩展 已修复

添加: ForceClose 方法到接口定义

public interface IWebSocketConnection : IDisposable
{
    // ... 现有方法
    
    /// <summary>
    /// 强制关闭连接并重新创建WebSocket实例
    /// 用于处理CloseReceived状态后无法重连的问题
    /// </summary>
    void ForceClose();
}

3. ReceiveLoopAsync 方法修复 已修复

问题: Close 消息处理不当 修复: 正确处理 Close 消息并触发重连

else if (result.MessageType == WebSocketMessageType.Close)
{
    _logger.LogInformation("收到 WebSocket 关闭消息,准备处理连接关闭");
    // 收到关闭消息时,需要强制关闭连接并重新创建WebSocket实例
    // 这样可以确保后续能够重新连接
    _connection.ForceClose();
    _isConnected = false;
    _logger.LogInformation("WebSocket 连接已强制关闭,准备触发重连");
    
    // 确保在触发重连之前,当前接收循环能够正常退出
    // 重连任务会在后台启动,不会阻塞当前循环的退出
    TriggerReconnect();
    break;
}

4. CloseAsync 方法改进 已修复

问题: 关闭时没有处理重连任务和异常情况 修复: 完善关闭逻辑

// 等待任务完成
var closeTasks = new List<Task>();
if (_sendTask != null && !_sendTask.IsCompleted) 
{
    closeTasks.Add(_sendTask);
    _logger.LogDebug("等待发送任务完成");
}
if (_receiveTask != null && !_receiveTask.IsCompleted) 
{
    closeTasks.Add(_receiveTask);
    _logger.LogDebug("等待接收任务完成");
}
if (_heartbeatTask != null && !_heartbeatTask.IsCompleted) 
{
    closeTasks.Add(_heartbeatTask);
    _logger.LogDebug("等待心跳任务完成");
}
if (_reconnectTask != null && !_reconnectTask.IsCompleted) 
{
    closeTasks.Add(_reconnectTask);
    _logger.LogDebug("等待重连任务完成");
}

// 关闭连接
try
{
    await _connection.CloseAsync(WebSocketCloseStatus.NormalClosure, "正常关闭", cancellationToken);
    _logger.LogDebug("WebSocket 连接已正常关闭");
}
catch (Exception ex)
{
    _logger.LogWarning(ex, "正常关闭连接失败,强制关闭连接");
    _connection.ForceClose();
}

5. 重连逻辑改进 已修复

问题: 重连任务管理不够严谨 修复: 完善重连任务管理和状态处理

private void TriggerReconnect()
{
    lock (_reconnectLock)
    {
        if (_reconnectTask != null && !_reconnectTask.IsCompleted)
        {
            _logger.LogDebug("重连任务已在运行,跳过重复触发");
            return; // 重连任务已在运行
        }
        
        _logger.LogInformation("启动重连任务");
        _reconnectTask = Task.Run(() => ReconnectLoopAsync(_cancellationTokenSource.Token));
        _logger.LogDebug("重连任务已启动: {TaskId}", _reconnectTask.Id);
    }
}

private async Task ReconnectLoopAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation("重连循环开始,当前重连次数: {Attempts}", _reconnectAttempts);
    _isConnected = false;
    
    while (_reconnectAttempts < _config.MaxReconnectAttempts && !cancellationToken.IsCancellationRequested)
    {
        _reconnectAttempts++;
        
        var delaySeconds = Math.Min(Math.Pow(2, _reconnectAttempts - 1), 30);
        var delay = TimeSpan.FromSeconds(delaySeconds);
        
        _logger.LogWarning("WebSocket 连接断开,{DelaySeconds}秒后进行第{Attempt}次重连", 
            delaySeconds, _reconnectAttempts);
        
        await Task.Delay(delay, cancellationToken);

        try
        {
            _logger.LogInformation("开始第{Attempt}次重连尝试", _reconnectAttempts);
            await ConnectInternalAsync(cancellationToken);
            _logger.LogInformation("WebSocket 重连成功,重连次数: {Attempts}", _reconnectAttempts);
            return;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "WebSocket 重连失败,尝试次数: {Attempt}", _reconnectAttempts);
            
            // 重连失败时,确保连接状态正确
            _isConnected = false;
            _connection.ForceClose();
        }
    }

    _logger.LogError("WebSocket 重连失败,已达到最大尝试次数: {MaxAttempts}", _config.MaxReconnectAttempts);
    _isConnected = false;
}

6. 连接状态管理改进 已修复

问题: 连接失败时状态没有正确重置 修复: 确保连接状态的一致性

private async Task ConnectInternalAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation("正在连接 WebSocket 服务器: {Url}, 超时时间: {TimeoutMs}ms", _config.Url, _config.TimeoutMs);
    
    using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    timeoutCts.CancelAfter(_config.TimeoutMs);

    try
    {
        await _connection.ConnectAsync(new Uri(_config.Url), timeoutCts.Token);
        _logger.LogDebug("WebSocket 连接建立成功");
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "WebSocket 连接建立失败: {Url}", _config.Url);
        // 连接失败时,确保状态正确
        _isConnected = false;
        _connection.ForceClose();
        throw;
    }
    
    _isConnected = true;
    _reconnectAttempts = 0;
    UpdateHeartbeat();
    _logger.LogDebug("连接状态已更新,重连次数重置为 0");

    // 启动后台任务
    _logger.LogDebug("启动后台任务");
    _sendTask = Task.Run(() => SendLoopAsync(_cancellationTokenSource.Token));
    _receiveTask = Task.Run(() => ReceiveLoopAsync(_cancellationTokenSource.Token));
    _heartbeatTask = Task.Run(() => HeartbeatLoopAsync(_cancellationTokenSource.Token));
    _logger.LogDebug("后台任务启动完成: 发送={SendTaskId}, 接收={ReceiveTaskId}, 心跳={HeartbeatTaskId}", 
        _sendTask?.Id, _receiveTask?.Id, _heartbeatTask?.Id);

    _logger.LogInformation("WebSocket 连接成功建立,所有后台任务已启动");
}

修复效果

  1. 解决了 CloseReceived 状态问题

    • 收到 Close 消息时正确强制关闭连接
    • 重新创建 WebSocket 实例,确保可以重连
  2. 改进了连接状态管理

    • 连接失败时正确重置状态
    • 重连失败时确保状态一致性
  3. 完善了任务管理

    • 关闭时等待所有后台任务完成
    • 重连任务管理更加严谨
  4. 增强了错误处理

    • 连接关闭异常时的降级处理
    • 更详细的日志记录
  5. 提高了系统稳定性

    • 支持多次重连
    • 连接状态更加可靠

测试建议

  1. 连接断开测试

    • 模拟服务器主动关闭连接
    • 验证重连机制是否正常工作
  2. 网络异常测试

    • 模拟网络中断
    • 验证连接恢复能力
  3. 多次重连测试

    • 连续多次断开连接
    • 验证重连次数限制和延迟策略
  4. 并发测试

    • 多线程环境下的连接管理
    • 验证线程安全性

总结

此次修复解决了 WebSocket 连接管理中的严重 Bug,确保了连接在收到 Close 消息后能够正确重连。修复涉及多个层面的改进,包括连接状态管理、任务管理、错误处理等,大大提高了系统的稳定性和可靠性。

2024年修改记录

WebSocketTransportExtensions 清理优化

修改时间: 2024年 修改文件:

  • CoreAgent.WebSocketTransport/Extensions/WebSocketTransportExtensions.cs

优化内容:

  1. 代码重构

    • 消除重复代码,将公共逻辑提取到私有方法中
    • 添加参数验证,确保输入参数不为空
    • 改进代码结构和可读性
  2. 方法拆分

    • RegisterCoreServices() - 注册核心服务组件
    • RegisterDefaultMiddleware() - 注册默认中间件
    • 提高代码的可维护性和可测试性
  3. 错误处理改进

    • 添加 ArgumentNullException 检查
    • 确保配置对象正确获取
    • 改进依赖注入的健壮性
  4. 配置管理优化

    • 统一使用 IOptions<WebSocketConfig> 获取配置
    • 消除配置获取的重复代码
    • 确保配置一致性
  5. 移除冗余方法

    • 移除委托配置重载方法 AddWebSocketTransport(Action<WebSocketConfig>)
    • 简化 API 设计,减少维护成本
    • 统一使用基于 IConfiguration 的配置方式

Startup 集成实现

修改时间: 2024年 修改文件:

  • CoreAgent.API/Startup.cs
  • CoreAgent.API/Configurations/websocket.json
  • CoreAgent.API/Configurations/websocket.Development.json

集成内容:

  1. Startup.cs 修改

    • 添加 using CoreAgent.WebSocketTransport.Extensions;
    • ConfigureServices 中注册 WebSocket 传输服务
    • 在配置加载中添加 WebSocket 配置文件
  2. 配置文件创建

    • websocket.json - 生产环境配置
    • websocket.Development.json - 开发环境配置
    • 包含所有 WebSocketConfig 类的属性配置项
  3. 配置项说明

    • Url: WebSocket 服务器地址
    • TimeoutMs: 连接超时时间(毫秒)
    • BatchTimeoutMs: 批量发送时间窗口(毫秒)
    • MaxBatchSize: 最大批量大小(条消息)
    • MaxReconnectAttempts: 最大重连尝试次数
    • QueueCapacity: 消息队列容量
    • CacheTtlMinutes: 缓存消息 TTL(分钟)
  4. 环境差异配置

    • 开发环境:较小的队列容量和批量大小,较短的超时时间,较少的重连次数
    • 生产环境:较大的队列容量和批量大小,较长的超时时间,较多的重连次数

设计优势:

  • 代码结构清晰,易于维护
  • 配置灵活,支持不同环境
  • 错误处理完善,提高系统健壮性
  • 遵循依赖注入最佳实践
  • 支持热重载配置
  • API 设计简洁,减少冗余方法
  • 配置文件与模型类完全匹配,确保配置绑定正确

影响范围:

  • WebSocket 传输服务注册
  • 配置管理
  • 中间件集成
  • 应用程序启动流程

2024-12-19 - 心跳消息实体模型创建

新增文件

  • HeartbeatMessage.cs: 心跳消息实体模型,包含消息类型和载荷结构

模型设计

  1. HeartbeatMessage 类

    • Type 属性:消息类型,默认值为 "heartbeat"
    • Payload 属性:消息载荷,类型为 HeartbeatPayload
  2. HeartbeatPayload 类

    • Message 属性:心跳消息内容,默认值为 "ping"

设计特点

  • 符合 JSON 结构:{"type": "heartbeat", "payload": {"message": "ping"}}
  • 使用默认值初始化,简化使用
  • 包含完整的 XML 文档注释
  • 遵循 C# 命名规范

使用场景

  • WebSocket 心跳检测
  • 连接状态监控
  • 网络连通性测试
  • 服务器健康检查

技术实现

public class HeartbeatMessage
{
    public string Type { get; set; } = "heartbeat";
    public HeartbeatPayload Payload { get; set; } = new HeartbeatPayload();
}

public class HeartbeatPayload
{
    public string Message { get; set; } = "ping";
}

序列化结果

{
    "type": "heartbeat",
    "payload": {
        "message": "ping"
    }
}