You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

19 KiB

WebSocketMiddleware 修改记录

1. 初始分析

  • 分析了 _channelPool 的使用情况
  • 发现 Channel 对象池的实现和潜在问题

2. 问题识别

  • Channel 对象频繁创建和销毁,可能导致 GC 压力
  • 缺乏对 Channel 状态的验证
  • 没有监控机制来跟踪对象池的效率

3. 初步修复

  • 实现了 ChannelPooledObjectPolicy 来管理 Channel 对象池
  • 添加了状态验证和错误处理
  • 创建了 ChannelPoolMonitor 来监控对象池性能

4. 并发问题修复

  • 识别并修复了快速重连时的竞态条件
  • 引入了 ConcurrentDictionary<string, ChannelInfo> _activeChannels 来跟踪活跃的 Channel
  • 确保每个连接 ID 只有一个活跃的 Channel

5. 异步任务生命周期问题

  • 分析了 StartMessageProcessingLoop 异步任务的取消机制
  • 发现异步任务可能在使用中的 Channel 被归还后仍在运行

6. 进一步优化

  • 增强了 ChannelInfo 类,包含 CancellationTokenSourceProcessingTask
  • 改进了 ReturnMessageChannelAsync 方法,确保任务完成后再归还 Channel

7. 代码重构优化

  • 提取了 GetMessageChannelReturnMessageChannelAsync 方法
  • 改进了代码结构和可读性
  • 添加了详细的日志记录

8. Channel 状态分析

  • 深入分析了 Channel 的完成状态和重用限制
  • 确认了 .NET Channel 一旦关闭就无法重用的设计

9. 最终修复方案

  • 完善了 Channel 对象池的归还逻辑
  • 确保了异步任务的正确取消和等待
  • 优化了资源管理和清理流程

10. CancellationTokenSource 统一修复

  • 问题识别:发现 HandleWebSocketConnectionGetMessageChannel 创建了两个不同的 CancellationTokenSource
  • 风险分析
    • StartMessageProcessingLoop 使用 HandleWebSocketConnection 中的 cts.Token
    • ReturnMessageChannelAsync 取消的是 ChannelInfo.CancellationTokenSource
    • 两个不同的实例导致任务无法正确取消
  • 修复方案
    • 修改 HandleWebSocketConnection 使用 ChannelInfo.CancellationTokenSource.Token
    • 确保所有异步任务使用同一个 CancellationTokenSource
    • 正确设置 channelInfo.ProcessingTask = processTask
  • 修复效果
    • 统一了 CancellationTokenSource 的使用
    • 确保任务可以被正确取消
    • 避免了资源泄漏风险
    • 提高了代码的一致性和可维护性

11. Channel 状态验证修复

  • 问题识别:发现 ObjectPool.Get() 可能返回已关闭的 Channel(Items = 0, Closed = true)
  • 问题根源
    • ObjectPool.Return() 方法返回 void,不关心 IPooledObjectPolicy.Return() 的返回值
    • 即使 ChannelPooledObjectPolicy.Return() 返回 false,ObjectPool 仍可能将 Channel 放回池中
    • 下次 ObjectPool.Get() 时,可能返回一个已关闭的 Channel
  • 修复方案
    • GetMessageChannel 中添加 IsChannelUsable() 验证方法
    • 实现重试机制(最多3次),避免获取到不可用的 Channel
    • 如果对象池获取失败或获取的 Channel 不可用,创建新的 Channel
    • 将不可用的 Channel 归还到池中(虽然它不会被重用)
  • 验证逻辑
    private bool IsChannelUsable(Channel<WebSocketMessage> channel)
    {
        // 检查 Channel 是否已关闭
        if (channel.Reader.Completion.IsCompleted) return false;
    
        // 检查是否有未处理的消息
        if (channel.Reader.TryPeek(out _)) return false;
    
        // 检查写入器状态
        if (!channel.Writer.TryComplete()) return false;
    
        return true;
    }
    
  • 修复效果
    • 确保不会使用已关闭的 Channel
    • 提高了对象池的可靠性
    • 添加了重试机制,提高成功率
    • 增强了错误处理和日志记录

12. Channel 对象池根本问题修复 (2024-12-19)

  • 问题识别:用户发现 messageChannel = _channelPool.Get() 返回的 Channel 状态为 Items = 0, Closed = true,无法使用
  • 问题根源分析
    • _channelPool.Return(messageChannel) 执行完成后,Channel 被标记为关闭状态
    • 当新的连接请求到达时,对象池可能返回一个已经关闭的 Channel
    • ChannelPooledObjectPolicy.Return() 方法中的逻辑有问题,没有正确处理 Channel 的重用
  • 修复方案
    1. 改进 ChannelPooledObjectPolicy.Return() 方法

      • 添加异常处理,确保方法稳定性
      • 改进 Channel 状态检查逻辑
      • 只有当 Channel 未关闭时才返回 true(可重用)
    2. 添加 Channel 获取验证机制

      • 新增 GetValidMessageChannel() 方法,实现重试机制
      • 新增 IsChannelValid() 方法,验证 Channel 状态
      • 新增 ReturnMessageChannel() 方法,安全地返回 Channel 到对象池
    3. 重试机制

      • 最多重试3次获取有效 Channel
      • 如果对象池无法提供有效 Channel,创建新的
      • 添加详细的日志记录,便于问题排查
  • 修复效果
    • 确保获取到的 Channel 始终可用
    • 避免了使用已关闭 Channel 的问题
    • 提高了对象池的可靠性
    • 增强了错误处理和恢复机制
    • 添加了详细的日志记录

13. 当前状态

  • Channel 对象池正常工作
  • 并发问题已解决
  • 异步任务生命周期管理正确
  • CancellationTokenSource 统一使用
  • Channel 状态验证完善
  • 资源管理和清理完善
  • 监控和日志记录完整
  • Channel 对象池根本问题已修复

14. 剩余风险评估

修复后的系统现在具有以下特点:

已解决的风险

  1. CancellationTokenSource 统一:所有异步任务使用同一个 CancellationTokenSource
  2. 任务取消正确ReturnMessageChannelAsync 可以正确取消所有相关任务
  3. Channel 状态验证:确保不会使用已关闭的 Channel
  4. 资源管理完善:Channel 和 CancellationTokenSource 都被正确释放
  5. 并发安全:使用 ConcurrentDictionary 确保线程安全
  6. 对象池可靠性:Channel 对象池现在能够正确处理重用和状态验证

⚠️ 仍需关注的风险

  1. 性能影响:Channel 状态验证和重试机制可能增加少量性能开销
  2. 重试延迟:在极端情况下,重试机制可能导致连接建立延迟
  3. 配置调优:可能需要根据实际使用情况调整重试次数和超时时间

🎯 建议

  1. 监控 Channel 状态验证的成功率和重试次数
  2. 根据实际使用情况调整重试参数(当前设置为3次重试)
  3. 定期审查对象池的性能指标
  4. 监控连接建立时间,确保重试机制不会显著影响用户体验

15. ChannelPooledObjectPolicy 日志跟踪功能 (2024-12-19)

  • 功能添加:为 ChannelPooledObjectPolicy 添加了详细的跟踪日志
  • 日志内容
    1. Create() 方法:记录新 Channel 实例的创建,包含 Channel ID(HashCode)
    2. Return() 方法:记录 Channel 重用状态检查的详细过程
      • Channel 完全关闭时的日志
      • Channel 仍然可用时的日志
      • Channel 已关闭时的日志
      • 异常情况的错误日志
  • 技术实现
    • 添加了 ILogger<ChannelPooledObjectPolicy> 依赖注入
    • 修改了 WebSocketMiddleware 构造函数,添加 ILoggerFactory 参数
    • 使用 loggerFactory.CreateLogger<ChannelPooledObjectPolicy>() 创建专用 logger
  • 日志级别
    • Debug 级别:Channel 创建和状态检查的详细信息
    • Error 级别:异常情况的错误信息
  • 监控价值
    • 可以跟踪 Channel 对象的生命周期
    • 监控对象池的重用效率
    • 快速定位 Channel 状态问题
    • 分析对象池性能瓶颈

16. IsChannelValid 方法日志跟踪功能 (2024-12-19)

  • 功能添加:为 IsChannelValid 方法添加了详细的跟踪日志
  • 日志内容
    1. 方法开始:记录开始检查 Channel 有效性的日志,包含 Channel ID
    2. 读取器状态检查:记录 Channel 读取器的完成状态
    3. 写入器状态检查:记录 Channel 写入器的完成状态
    4. 检查结果:记录 Channel 是否有效的最终结果
    5. 异常处理:记录检查过程中发生的异常
  • 技术实现
    • 使用 channel.GetHashCode() 作为 Channel ID 进行标识
    • 分别检查 channel.Reader.Completion.IsCompletedchannel.Writer.TryComplete()
    • 在每个检查步骤都添加了详细的 Debug 级别日志
    • 改进了异常处理,捕获具体的异常信息
  • 日志级别
    • Debug 级别:Channel 状态检查的详细信息
    • Error 级别:检查过程中发生的异常
  • 监控价值
    • 可以详细跟踪 Channel 状态验证过程
    • 快速定位 Channel 无效的具体原因
    • 监控 Channel 对象池的验证成功率
    • 帮助分析 Channel 重用失败的原因

17. Channel 对象池简化移除 (2024-12-19)

  • 问题识别:用户指出既然每次都是创建新的 Channel,应该简化移除 _channelPool
  • 问题分析
    1. 对象池无意义:每次 GetValidMessageChannel() 最终都是创建新的 Channel
    2. 逻辑复杂:对象池的重试机制和验证逻辑增加了复杂性
    3. 性能开销:对象池的获取、验证、返回操作增加了不必要的开销
  • 解决方案:完全移除 Channel 对象池,直接创建新的 Channel
  • 修复内容
    1. 移除对象池相关代码
      • 删除 _channelPool 字段
      • 删除 ChannelPooledObjectPolicy
      • 删除 GetValidMessageChannel() 方法
      • 删除 IsChannelValid() 方法
      • 删除 ReturnMessageChannel() 方法
    2. 简化 Channel 创建
      • 新增 CreateNewMessageChannel() 方法
      • 直接创建新的 Channel 实例,无需验证和重试
      • 移除对象池相关的复杂逻辑
    3. 清理依赖
      • 移除 Microsoft.Extensions.ObjectPool 引用
      • 移除 ILoggerFactory 参数(不再需要创建 ChannelPooledObjectPolicy logger)
  • 修复效果
    • 简化了代码逻辑,提高了可维护性
    • 消除了对象池的复杂性和潜在问题
    • 减少了不必要的性能开销
    • 确保每个连接都有全新的 Channel
    • 解决了重连客户端无法使用的问题
  • 性能影响
    • 轻微的性能开销:每次连接创建新的 Channel
    • 但避免了对象池的复杂性和验证开销
    • 总体而言,简化带来的好处大于性能开销

18. WebSocket Channel 管理重构 (2024-12-19)

  • 问题识别:用户要求将 CreateNewMessageChannel 提取为单独的类来管理 Channel 的创建、记录和释放
  • 命名优化:将通用的 ChannelManager 重命名为 WebSocketChannelManager,避免命名冲突
  • 文件结构优化:创建了清晰的目录结构 src/X1.WebSocket/Services/ChannelManagement/
  • 功能增强
    1. Channel 生命周期管理
      • 创建:CreateNewMessageChannel(string connectionId)
      • 释放:ReleaseMessageChannel(Channel<WebSocketMessage> channel, string connectionId)
      • 统计:GetStatistics() 返回总创建数、总释放数、活跃数
    2. 详细统计信息
      • 每个 Channel 的消息处理数量
      • 每个 Channel 的总字节处理量
      • Channel 的创建时间和释放时间
      • Channel 的存活时间统计
    3. 监控功能
      • 每创建100个 Channel 自动记录统计信息
      • 提供活跃 Channel 列表查询
      • 支持按连接ID查找 Channel 信息
      • 清理已释放的 Channel 信息
  • 技术实现
    1. 线程安全:使用 ConcurrentDictionary<int, WebSocketChannelInfo> 管理活跃 Channel
    2. 原子操作:使用 Interlocked 进行计数器的线程安全操作
    3. 字段优化:将 MessageCountTotalBytesProcessed 改为字段以支持 ref 操作
    4. 异常处理:完善的异常处理和日志记录
  • 代码重构
    1. 分离关注点:将 Channel 管理逻辑从 WebSocketMiddleware 中分离
    2. 依赖注入:通过构造函数注入 ILogger
    3. 命名空间清晰:使用 CellularManagement.WebSocket.Services.ChannelManagement 命名空间
    4. 接口设计:提供清晰的公共接口,便于扩展和维护
  • 日志增强
    1. 创建日志:记录 Channel ID、连接ID、总创建数
    2. 释放日志:记录存活时间、处理消息数、总字节数
    3. 统计日志:定期记录 Channel 统计信息
    4. 错误日志:详细的异常处理和错误记录
  • 修复效果
    • 代码结构更清晰,职责分离明确
    • 避免了命名冲突,使用更具体的类名
    • 提供了完整的 Channel 生命周期管理
    • 增强了监控和统计功能
    • 改进了日志记录和错误处理
    • 便于后续扩展和维护

2024-12-19 - WebSocket Channel管理器优化

修改文件

  • src/X1.WebSocket/Services/ChannelManagement/WebSocketChannelManager.cs

主要改进

1. 连接级别的锁机制

  • 添加了 ConcurrentDictionary<string, SemaphoreSlim> _connectionLocks 来管理连接级别的锁
  • 确保同一连接的Channel操作串行化,避免并发问题
  • 实现了 CleanupConnectionLock 方法来清理连接锁资源

2. 异步操作统一

  • 移除了同步版本的Channel操作方法
  • 统一使用异步方法 CreateNewMessageChannelReleaseMessageChannel
  • 提高了系统的响应性和并发处理能力

3. 增强的Channel生命周期管理

  • 改进了Channel创建时的重复检查逻辑
  • 添加了超时机制,避免死锁情况
  • 增强了Channel释放时的错误处理和日志记录

4. 统计信息增强

  • 添加了连接状态检查方法 GetConnectionStatus
  • 新增连接统计方法 GetConnectionStatistics
  • 提供了更详细的Channel和连接管理信息

5. 资源清理优化

  • 实现了 CleanupReleasedChannels 方法来清理已释放的Channel信息
  • 添加了连接锁的自动清理机制
  • 改进了内存管理和资源释放

技术特性

  • 使用 ConcurrentDictionary 确保线程安全
  • 实现了原子操作来更新统计信息
  • 添加了详细的日志记录用于调试和监控
  • 支持Channel的存活时间统计和消息处理统计

性能优化

  • 使用连接级别的锁减少锁竞争
  • 实现了Channel的批量统计记录(每100个记录一次)
  • 优化了Channel查找和清理算法

监控和调试

  • 提供了丰富的统计信息接口
  • 添加了详细的日志记录
  • 支持连接状态和Channel状态的实时查询

2024-12-19 WebSocket超时判断逻辑分析

问题分析

检查 WebSocketMiddleware.cs 中第320-321行的超时判断逻辑:

// 检查消息是否超时
if (IsMessageTimeout(messageStartTime))

发现的问题

  1. 超时判断逻辑不合理

    • messageStartTimeProcessWebSocketMessages 方法开始时设置为连接开始时间
    • 在每次处理有效消息后,messageStartTime 被更新为当前时间(第350行)
    • 但是超时判断使用的是 MessageSendTimeout(30秒),这个配置项名称与实际用途不符
  2. 配置项命名错误

    • MessageSendTimeout 实际上用于控制消息接收超时,而不是发送超时
    • 应该重命名为 MessageReceiveTimeoutMessageProcessingTimeout
  3. 超时逻辑缺陷

    • 当前逻辑会在每次消息处理后重置 messageStartTime
    • 这意味着如果客户端在30秒内发送任何消息,超时计时器就会重置
    • 这可能导致连接永远不会因为超时而关闭,即使客户端长时间不活跃
  4. 阻塞接收问题

    • webSocket.ReceiveAsync() 是阻塞等待,如果客户端长时间不发送消息,代码会一直阻塞
    • 超时检查永远不会执行,导致连接永远不会被关闭
    • 存在严重的资源泄漏风险

已完成的修复

  1. 重命名配置项

    • WebSocketOptions.MessageSendTimeout 重命名为 MessageReceiveTimeout
    • 更新了配置项的注释说明
  2. 修改超时判断逻辑

    • 分离了连接超时和消息接收超时的检查
    • 添加了 IsConnectionTimeout() 方法检查整个连接的超时(5分钟)
    • 添加了 IsMessageReceiveTimeout() 方法检查消息接收超时(30秒)
    • 使用 lastMessageTime 来跟踪最后一次接收消息的时间
  3. 更新相关方法

    • 修改了 ProcessWebSocketMessages 方法签名,添加了 lastMessageTime 参数
    • 修改了 ProcessMessage 方法签名,移除了不必要的 messageStartTime 参数
    • 添加了 HandleConnectionTimeout()HandleMessageReceiveTimeout() 方法
    • 更新了 WebSocketMessageHandlerAdapter 中的超时配置引用
  4. 改进的超时机制

    • 连接超时:基于连接开始时间,超过5分钟关闭连接
    • 消息接收超时:基于最后消息时间,超过30秒没有新消息则关闭连接
    • 这样可以确保连接在真正超时时能够正确关闭
  5. 修复阻塞接收问题

    • 添加了 ReceiveMessageWithTimeout() 方法,实现带超时的消息接收
    • 使用 CancellationTokenSource 创建带超时的取消令牌
    • 动态计算剩余超时时间,使用较小的超时值
    • 在超时时正确关闭连接并返回null
    • 确保即使客户端长时间不发送消息,连接也能在超时后正确关闭

移除超时判断的风险分析

低风险场景

  • 正常的长连接应用:如聊天、实时监控、推送通知
  • 客户端主动管理连接:客户端会定期发送心跳或主动关闭
  • 网络环境稳定:连接质量好,很少出现网络问题

高风险场景

  • 僵尸连接:客户端异常断开但服务器不知道
  • 资源泄漏:大量无效连接占用服务器资源
  • 网络异常:网络中断但连接状态未更新
  • 客户端崩溃:客户端进程异常退出

具体风险

  1. 资源泄漏风险 ⚠️

    • 每个连接占用内存缓冲区、文件描述符、线程资源
    • 大量僵尸连接可能导致服务器内存耗尽
  2. 性能下降风险 ⚠️

    • 服务器需要维护连接列表、心跳检测、状态同步
    • 无效连接越多,性能越差
  3. 连接数限制风险 ⚠️

    • 僵尸连接占用连接数,新连接无法建立
    • 影响系统的可扩展性

建议的解决方案

  1. 保留超时机制:但使用更合理的超时时间(如5-10分钟)
  2. 实现心跳机制:客户端定期发送心跳包
  3. 连接状态检查:定期检查WebSocket连接的实际状态
  4. 应用层超时:在应用层处理业务超时,而不是传输层

当前配置值

  • MessageReceiveTimeout: 30秒
  • ConnectionTimeout: 5分钟

结论

已成功修复了超时判断逻辑的设计缺陷和阻塞接收问题,现在连接能够正确地在超时后关闭,避免了资源泄漏问题。新的实现确保了:

  • 连接超时检查正常工作
  • 消息接收超时检查正常工作
  • 不会因为阻塞等待而忽略超时
  • 资源能够及时释放

建议保留超时机制,但可以根据实际业务需求调整超时时间,或者实现更智能的心跳机制。