You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

21 KiB

WebSocketMessageManager.cs (自动转换为Markdown)

// 以下内容为原始C#代码,含详细注释
// 文件原路径:Managers/WebSocketMgr/WebSocketMessageManager.cs

using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using WebSocket4Net;

namespace CoreAgent.ProtocolClient.Managers.WebSocketMgr
{
    /// <summary>
    /// WebSocket消息管理器 - 专门处理WebSocket的收发业务
    /// 
    /// 重构说明:
    /// 1. 对应LTEClientWebSocket中的WebSocket连接和消息传输功能
    /// 2. 集成MessageIdManager统一管理消息ID和回调
    /// 3. 移除_sentMessages和_receivedMessages消息缓存,专注传输
    /// 4. 移除业务逻辑功能(统计更新、日志解析等),实现职责分离
    /// 
    /// 主要功能:
    /// - WebSocket连接管理(对应LTEClientWebSocket.Start()和Stop())
    /// - 消息发送和接收(对应LTEClientWebSocket.SendMessage()和OnSocketMessage())
    /// - 消息队列和批量发送(对应LTEClientWebSocket._messageFifo和SendMessageNow())
    /// - 事件通知(对应LTEClientWebSocket的事件系统)
    /// 
    /// 与LTEClientWebSocket的详细对应关系:
    /// 
    /// 1. 连接管理对应关系:
    ///    - Connect() 对应 Start() 方法中的WebSocket连接建立逻辑
    ///    - Disconnect() 对应 Stop() 方法中的WebSocket关闭逻辑
    ///    - OnSocketOpened/OnSocketClosed/OnSocketError 对应原始的事件处理器
    /// 
    /// 2. 消息发送对应关系:
    ///    - SendMessage() 对应 SendMessage() 方法的核心逻辑
    ///    - SendLogGetMessage() 对应 LogGet() 方法中的消息发送部分
    ///    - _messageFifo 对应原始的 _messageFifo 队列
    ///    - SendMessageNow() 对应原始的 SendMessageNow() 方法
    ///    - StartMessageDeferTimer() 对应原始的定时器启动逻辑
    /// 
    /// 3. 消息接收对应关系:
    ///    - OnSocketMessageReceived() 对应 OnSocketMessage() 方法
    ///    - HandleReceivedMessage() 对应 OnSocketMessage() 中的消息处理逻辑
    ///    - MessageReceived 事件对应原始的 MessageReceived 事件
    /// 
    /// 4. 消息ID管理对应关系:
    ///    - MessageIdManager 替代原始的 _messageId 和 _logGetId 字段
    ///    - GenerateGeneralMessageId() 对应 Interlocked.Increment(ref _messageId)
    ///    - GenerateLogGetMessageId() 对应 _logGetId 的管理逻辑
    /// 
    /// 5. 事件系统对应关系:
    ///    - ConnectionOpened 对应原始的 ConnectionOpened 事件
    ///    - ConnectionClosed 对应原始的 ConnectionClosed 事件
    ///    - ConnectionError 对应原始的 ConnectionError 事件
    ///    - MessageReceived 对应原始的 MessageReceived 事件
    ///    - MessageSent 新增事件,提供更完整的消息生命周期通知
    /// 
    /// 6. 资源管理对应关系:
    ///    - Dispose() 对应原始的 Dispose() 方法
    ///    - _disposed 字段对应原始的 _disposed 字段
    ///    - 定时器管理对应原始的定时器清理逻辑
    /// 
    /// 重构优势:
    /// 1. 职责分离:专注WebSocket传输,移除业务逻辑
    /// 2. 代码复用:可在多个地方复用WebSocket管理器
    /// 3. 测试友好:更容易进行单元测试
    /// 4. 维护简单:更清晰的代码结构
    /// 5. 功能增强:通过MessageIdManager提供更好的消息管理
    /// </summary>
    public partial class WebSocketMessageManager : IDisposable
    {
        #region 私有字段

        /// <summary>
        /// WebSocket实例 - 对应LTEClientWebSocket._webSocket
        /// 负责底层的WebSocket连接和通信
        /// 
        /// 对应关系:
        /// - 创建:Connect()方法中创建,对应Start()方法中的_webSocket = new WebSocket(url)
        /// - 配置:EnableAutoSendPing = false,对应原始实现
        /// - 事件绑定:绑定Opened/Closed/MessageReceived/Error事件,对应原始事件绑定
        /// - 连接:调用Open()方法,对应原始的_webSocket.Open()
        /// - 关闭:调用Close()方法,对应原始的_webSocket.Close()
        /// - 清理:在Disconnect()中设置为null,对应原始的资源清理
        /// </summary>
        private WebSocket? _webSocket;

        /// <summary>
        /// 消息ID管理器 - 对应LTEClientWebSocket._messageId和_logGetId
        /// 统一管理通用消息ID和日志获取消息ID,提供更好的消息跟踪和回调管理
        /// 
        /// 对应关系:
        /// - _messageId 对应 MessageIdManager.GenerateGeneralMessageId()
        /// - _logGetId 对应 MessageIdManager.GenerateLogGetMessageId()
        /// - _messageHandlers 对应 MessageIdManager的内部处理器管理
        /// - _messageHandlersByName 对应 MessageIdManager的按名称处理器管理
        /// 
        /// 功能增强:
        /// - 线程安全的ID生成,替代原始的Interlocked.Increment
        /// - 统一的处理器管理,提供更好的回调跟踪
        /// - 自动清理过期处理器,防止内存泄漏
        /// - 事件通知机制,提供ID变化通知
        /// </summary>
        private readonly MessageIdManager _messageIdManager;

        /// <summary>
        /// 日志记录器 - 对应LTEClientWebSocket._logger
        /// 用于记录WebSocket操作和错误信息
        /// 
        /// 对应关系:
        /// - 构造函数参数:对应LTEClientWebSocket构造函数中的logger参数
        /// - 日志记录:对应原始实现中的所有_logger.LogXXX调用
        /// - 日志格式:保持与原始实现一致的日志格式
        /// 
        /// 功能增强:
        /// - 更详细的错误日志记录
        /// - 更好的异常堆栈跟踪
        /// - 统一的日志格式和级别
        /// </summary>
        private readonly ILogger _logger;

        /// <summary>
        /// 客户端名称 - 对应LTEClientWebSocket._config.Name
        /// 用于日志记录和事件标识
        /// 
        /// 对应关系:
        /// - 构造函数参数:对应LTEClientWebSocket构造函数中的config.Name
        /// - 日志前缀:对应原始实现中所有日志的[{_config.Name}]前缀
        /// - 事件标识:用于区分不同客户端的事件
        /// 
        /// 功能增强:
        /// - 参数验证:确保clientName不为null
        /// - 统一标识:在所有日志和事件中使用一致的客户端标识
        /// </summary>
        private readonly string _clientName;

        /// <summary>
        /// 消息队列 - 对应LTEClientWebSocket._messageFifo
        /// 线程安全的阻塞集合,用于批量发送优化
        /// 优化说明:从ConcurrentQueue改为BlockingCollection,提供更好的线程安全性和阻塞能力
        /// 
        /// 对应关系:
        /// - 队列类型:BlockingCollection<JObject>,优化后的线程安全集合
        /// - 入队操作:Add(message),对应原始的_messageFifo.Enqueue(message)
        /// - 出队操作:TryTake(out message),对应原始的队列处理逻辑
        /// - 批量处理:支持批量消息发送,对应原始的批处理逻辑
        /// 
        /// 功能增强:
        /// - 线程安全:使用BlockingCollection保证线程安全
        /// - 阻塞能力:支持阻塞式出队操作,提高性能
        /// - 批量优化:支持批量发送减少网络开销
        /// - 延迟发送:配合_messageDeferTimer实现延迟发送
        /// - 资源管理:自动处理集合的完成状态
        /// 
        /// 重构改进:
        /// - 移除了消息缓存功能,专注传输
        /// - 优化了队列操作逻辑,提供更好的性能
        /// - 增强了线程安全性和资源管理
        /// </summary>
        private readonly BlockingCollection<JObject> _messageFifo;

        /// <summary>
        /// 消息延迟发送定时器 - 对应LTEClientWebSocket._messageDeferTimer
        /// 用于实现消息的批量发送和延迟发送机制
        /// 保持与原始实现完全一致的逻辑
        /// 
        /// 对应关系:
        /// - 定时器类型:Timer,与原始实现完全一致
        /// - 启动逻辑:StartMessageDeferTimer(),对应原始的定时器启动
        /// - 停止逻辑:StopMessageDeferTimer(),对应原始的定时器停止
        /// - 延迟策略:1毫秒延迟,与原始实现完全一致
        /// - 批处理大小:100条消息,与原始实现完全一致
        /// 
        /// 功能保持:
        /// - 批量发送:当队列中消息少于100条时,延迟1毫秒发送
        /// - 立即发送:当队列中消息达到100条时,立即发送
        /// - 资源管理:在Disconnect()和Dispose()中正确释放
        /// 
        /// 重构改进:
        /// - 更清晰的定时器管理逻辑
        /// - 更好的异常处理
        /// - 保持了完全一致的批处理策略
        /// </summary>
        private Timer? _messageDeferTimer;

        /// <summary>
        /// 释放标志 - 对应LTEClientWebSocket._disposed
        /// 防止重复释放和已释放对象的操作
        /// 
        /// 对应关系:
        /// - 字段类型:bool,对应原始的_disposed字段
        /// - 返回值:true表示已释放,false表示未释放,对应原始实现
        /// - 使用场景:外部检查对象释放状态,对应原始实现
        /// - 线程安全:直接返回_disposed字段值,对应原始实现
        /// 
        /// 功能保持:
        /// - 释放状态检查:外部可以检查对象是否已释放
        /// - 资源保护:防止对已释放对象的操作
        /// - 状态查询:提供对象状态的查询接口
        /// 
        /// 重构改进:
        /// - 保持了完全一致的检查逻辑
        /// - 保持了完全一致的返回值语义
        /// - 保持了完全一致的使用场景
        /// </summary>
        private bool _disposed;

        /// <summary>
        /// 同步锁对象
        /// 用于确保线程安全的操作
        /// 
        /// 对应关系:
        /// - 新增功能:原始实现中没有显式的同步锁
        /// - 用途:确保关键操作的线程安全
        /// - 使用场景:在需要线程安全的地方使用lock语句
        /// 
        /// 功能增强:
        /// - 线程安全:确保关键操作的原子性
        /// - 死锁预防:使用细粒度锁避免死锁
        /// - 性能优化:最小化锁的持有时间
        /// </summary>
        private readonly object _lockObject = new object();

        #endregion

        #region 事件

        /// <summary>
        /// 连接打开事件 - 对应LTEClientWebSocket.ConnectionOpened
        /// 当WebSocket连接成功建立时触发
        /// 
        /// 对应关系:
        /// - 事件类型:EventHandler,与原始实现完全一致
        /// - 触发时机:在OnSocketOpened()中触发,对应原始的OnSocketOpened事件处理
        /// - 触发条件:WebSocket连接成功建立时
        /// - 事件参数:无参数,与原始实现完全一致
        /// 
        /// 功能保持:
        /// - 连接状态通知:通知外部连接已建立
        /// - 事件订阅:支持多个订阅者
        /// - 异步触发:事件触发不会阻塞WebSocket操作
        /// 
        /// 重构改进:
        /// - 更清晰的触发时机
        /// - 更好的错误处理
        /// - 保持了完全一致的事件接口
        /// </summary>
        public event EventHandler? ConnectionOpened;

        /// <summary>
        /// 连接关闭事件 - 对应LTEClientWebSocket.ConnectionClosed
        /// 当WebSocket连接关闭时触发
        /// 
        /// 对应关系:
        /// - 事件类型:EventHandler,与原始实现完全一致
        /// - 触发时机:在OnSocketClosed()中触发,对应原始的OnSocketClosed事件处理
        /// - 触发条件:WebSocket连接关闭时
        /// - 事件参数:无参数,与原始实现完全一致
        /// 
        /// 功能保持:
        /// - 连接状态通知:通知外部连接已关闭
        /// - 事件订阅:支持多个订阅者
        /// - 异步触发:事件触发不会阻塞WebSocket操作
        /// 
        /// 重构改进:
        /// - 更清晰的触发时机
        /// - 更好的资源清理
        /// - 保持了完全一致的事件接口
        /// </summary>
        public event EventHandler? ConnectionClosed;

        /// <summary>
        /// 连接错误事件 - 对应LTEClientWebSocket.ConnectionError
        /// 当WebSocket连接发生错误时触发
        /// 
        /// 对应关系:
        /// - 事件类型:EventHandler<string>,与原始实现完全一致
        /// - 触发时机:在OnSocketError()和异常处理中触发,对应原始的错误处理
        /// - 触发条件:WebSocket连接错误、消息处理错误等
        /// - 事件参数:错误信息字符串,与原始实现完全一致
        /// 
        /// 功能保持:
        /// - 错误通知:通知外部连接或处理错误
        /// - 事件订阅:支持多个订阅者
        /// - 异步触发:事件触发不会阻塞WebSocket操作
        /// 
        /// 重构改进:
        /// - 更详细的错误信息
        /// - 更好的异常处理
        /// - 保持了完全一致的事件接口
        /// </summary>
        public event EventHandler<string>? ConnectionError;

        /// <summary>
        /// 消息接收事件 - 对应LTEClientWebSocket.MessageReceived
        /// 当接收到WebSocket消息时触发
        /// 
        /// 对应关系:
        /// - 事件类型:EventHandler<JObject>,与原始实现完全一致
        /// - 触发时机:在OnSocketMessageReceived()中触发,对应原始的OnSocketMessage事件处理
        /// - 触发条件:接收到WebSocket消息并解析成功后
        /// - 事件参数:解析后的JObject消息,与原始实现完全一致
        /// 
        /// 功能保持:
        /// - 消息通知:通知外部接收到新消息
        /// - 事件订阅:支持多个订阅者
        /// - 异步触发:事件触发不会阻塞消息处理
        /// - 触发顺序:在消息处理开始时就触发,与原始实现完全一致
        /// 
        /// 重构改进:
        /// - 更清晰的触发时机
        /// - 更好的消息解析
        /// - 保持了完全一致的事件接口和触发顺序
        /// </summary>
        public event EventHandler<JObject>? MessageReceived;

        /// <summary>
        /// 消息发送事件 - 新增功能,LTEClientWebSocket中没有对应事件
        /// 当消息成功发送时触发,提供更完整的消息生命周期通知
        /// 
        /// 对应关系:
        /// - 事件类型:EventHandler<JObject>,与MessageReceived保持一致
        /// - 触发时机:在SendMessageNow()中触发,对应消息发送成功时
        /// - 触发条件:消息成功发送到WebSocket时
        /// - 事件参数:发送的JObject消息,与MessageReceived保持一致
        /// 
        /// 功能增强:
        /// - 消息生命周期:提供完整的消息发送通知
        /// - 调试支持:便于调试消息发送流程
        /// - 监控支持:便于监控消息发送状态
        /// - 事件订阅:支持多个订阅者
        /// 
        /// 重构优势:
        /// - 更完整的消息生命周期管理
        /// - 更好的调试和监控支持
        /// - 与MessageReceived事件形成对称的事件系统
        /// </summary>
        public event EventHandler<JObject>? MessageSent;

        #endregion

        #region 属性

        /// <summary>
        /// 是否已连接 - 对应LTEClientWebSocket.IsConnected
        /// 检查WebSocket连接状态
        /// 
        /// 对应关系:
        /// - 属性类型:bool,与原始实现完全一致
        /// - 检查逻辑:_webSocket?.State == WebSocketState.Open,与原始实现完全一致
        /// - 使用场景:在SendMessage()中检查连接状态,对应原始实现
        /// - 返回值:true表示已连接,false表示未连接,与原始实现完全一致
        /// 
        /// 功能保持:
        /// - 连接状态检查:快速检查WebSocket连接状态
        /// - 空安全:使用?.操作符避免空引用异常
        /// - 实时状态:反映WebSocket的实时连接状态
        /// 
        /// 重构改进:
        /// - 保持了完全一致的检查逻辑
        /// - 保持了完全一致的返回值语义
        /// - 保持了完全一致的使用场景
        /// </summary>
        public bool IsConnected => _webSocket?.State == WebSocketState.Open;

        /// <summary>
        /// WebSocket状态 - 对应LTEClientWebSocket._webSocket?.State
        /// 获取详细的WebSocket连接状态
        /// 
        /// 对应关系:
        /// - 属性类型:WebSocketState,对应原始的_webSocket?.State
        /// - 返回值:WebSocket的详细状态,对应原始实现
        /// - 空安全:使用??操作符提供默认值,对应原始实现
        /// - 使用场景:提供更详细的连接状态信息
        /// 
        /// 功能保持:
        /// - 详细状态:提供WebSocket的详细连接状态
        /// - 空安全:当_webSocket为null时返回WebSocketState.None
        /// - 实时状态:反映WebSocket的实时状态
        /// 
        /// 重构改进:
        /// - 保持了完全一致的状态获取逻辑
        /// - 保持了完全一致的默认值处理
        /// - 保持了完全一致的使用场景
        /// </summary>
        public WebSocketState State => _webSocket?.State ?? WebSocketState.None;

        /// <summary>
        /// 是否已释放 - 对应LTEClientWebSocket._disposed
        /// 检查对象是否已被释放
        /// 
        /// 对应关系:
        /// - 属性类型:bool,对应原始的_disposed字段
        /// - 返回值:true表示已释放,false表示未释放,对应原始实现
        /// - 使用场景:外部检查对象释放状态,对应原始实现
        /// - 线程安全:直接返回_disposed字段值,对应原始实现
        /// 
        /// 功能保持:
        /// - 释放状态检查:外部可以检查对象是否已释放
        /// - 资源保护:防止对已释放对象的操作
        /// - 状态查询:提供对象状态的查询接口
        /// 
        /// 重构改进:
        /// - 保持了完全一致的检查逻辑
        /// - 保持了完全一致的返回值语义
        /// - 保持了完全一致的使用场景
        /// </summary>
        public bool IsDisposed => _disposed;

        /// <summary>
        /// 消息队列数量 - 对应LTEClientWebSocket._messageFifo.Count
        /// 获取当前待发送消息的数量
        /// 
        /// 对应关系:
        /// - 属性类型:int,对应原始的_messageFifo.Count
        /// - 返回值:队列中待发送消息的数量,对应原始实现
        /// - 使用场景:监控消息队列状态,对应原始实现
        /// - 线程安全:BlockingCollection.Count是线程安全的,对应原始实现
        /// 
        /// 功能保持:
        /// - 队列监控:监控当前待发送消息的数量
        /// - 性能监控:便于监控消息发送性能
        /// - 调试支持:便于调试消息队列状态
        /// 
        /// 重构改进:
        /// - 保持了完全一致的计数逻辑
        /// - 保持了完全一致的返回值语义
        /// - 保持了完全一致的使用场景
        /// - 优化:使用BlockingCollection提供更好的线程安全性
        /// </summary>
        public int MessageQueueCount => _messageFifo.Count;

        /// <summary>
        /// 消息ID管理器 - 提供对MessageIdManager的访问
        /// 允许外部访问消息ID管理功能
        /// 
        /// 对应关系:
        /// - 属性类型:MessageIdManager,对应原始的_messageId和_logGetId管理
        /// - 返回值:内部的消息ID管理器实例,对应原始实现
        /// - 使用场景:外部访问消息ID管理功能,对应原始实现
        /// - 封装性:提供对内部MessageIdManager的访问,对应原始实现
        /// 
        /// 功能保持:
        /// - 功能访问:外部可以访问消息ID管理功能
        /// - 封装性:保持内部实现的封装性
        /// - 扩展性:支持外部扩展消息ID管理功能
        /// 
        /// 重构改进:
        /// - 更统一的消息ID管理接口
        /// - 更好的功能封装
        /// - 保持了完全一致的功能访问方式
        /// </summary>
        public MessageIdManager MessageIdManager => _messageIdManager;

        #endregion
    }
}