using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using Newtonsoft.Json; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using WebSocket4Net; namespace CoreAgent.ProtocolClient.Managers.WebSocketMgr { public partial class WebSocketMessageManager { #region 私有方法 /// /// 检查对象是否已释放 /// private void ThrowIfDisposed() { if (_disposed) throw new ObjectDisposedException(nameof(WebSocketMessageManager)); } /// /// WebSocket连接打开事件处理器 /// private void OnSocketOpened(object? sender, EventArgs e) { try { _logger.LogInformation($"[{_clientName}] WebSocket连接已建立"); ConnectionOpened?.Invoke(this, EventArgs.Empty); } catch (Exception ex) { _logger.LogError(ex, $"[{_clientName}] 处理连接打开事件异常: {ex.Message}"); } } /// /// WebSocket连接关闭事件处理器 /// private void OnSocketClosed(object? sender, EventArgs e) { try { _logger.LogInformation($"[{_clientName}] WebSocket连接已关闭"); ConnectionClosed?.Invoke(this, EventArgs.Empty); } catch (Exception ex) { _logger.LogError(ex, $"[{_clientName}] 处理连接关闭事件异常: {ex.Message}"); } } /// /// WebSocket消息接收事件处理器 /// private void OnSocketMessageReceived(object? sender, MessageReceivedEventArgs e) { try { var messageText = e.Message; _logger.LogDebug($"[{_clientName}] 接收到消息: {messageText}"); // 解析消息 JObject? message = null; try { message = JObject.Parse(messageText); } catch (JsonException ex) { _logger.LogError(ex, $"[{_clientName}] 消息解析失败: {messageText}"); ConnectionError?.Invoke(this, $"消息解析失败: {ex.Message}"); return; } // 触发MessageReceived事件 MessageReceived?.Invoke(this, message); // 处理消息 HandleReceivedMessage(message, error => ConnectionError?.Invoke(this, error)); } catch (Exception ex) { _logger.LogError(ex, $"[{_clientName}] 处理接收消息异常: {ex.Message}"); ConnectionError?.Invoke(this, $"处理接收消息异常: {ex.Message}"); } } /// /// WebSocket错误事件处理器 /// private void OnSocketError(object? sender, SuperSocket.ClientEngine.ErrorEventArgs e) { try { var errorMessage = e.Exception?.Message ?? "WebSocket连接错误"; _logger.LogError(e.Exception, $"[{_clientName}] WebSocket错误: {errorMessage}"); ConnectionError?.Invoke(this, errorMessage); } catch (Exception ex) { _logger.LogError(ex, $"[{_clientName}] 处理WebSocket错误事件异常: {ex.Message}"); } } /// /// 启动消息延迟发送定时器 /// private void StartMessageDeferTimer() { Timer? timer = null; timer = new Timer(_ => { try { OnMessageDeferTimer(null); } finally { timer?.Dispose(); // 用完即销毁 } }, null, 1, Timeout.Infinite); } /// /// 停止消息延迟发送定时器 /// private void StopMessageDeferTimer() { // 新实现下无需手动停止定时器,方法保留兼容性 } /// /// 消息延迟发送定时器回调 /// private void OnMessageDeferTimer(object? state) { try { // 批量发送消息 var messages = new List(); var count = 0; const int batchSize = 100; // 从队列中取出消息 while (count < batchSize && _messageFifo.TryTake(out var message)) { messages.Add(message); count++; } if (messages.Count > 0) { // 发送消息 SendMessageNow(messages); } // 如果队列中还有消息,继续启动定时器 if (_messageFifo.Count > 0) { StartMessageDeferTimer(); } } catch (Exception ex) { _logger.LogError(ex, $"[{_clientName}] 消息延迟发送定时器异常: {ex.Message}"); } } /// /// 立即发送消息 /// private void SendMessageNow(List messages) { if (messages == null || messages.Count == 0) return; if (!IsConnected) { _logger.LogWarning($"[{_clientName}] WebSocket未连接,无法发送消息"); return; } try { foreach (var message in messages) { var messageText = JsonConvert.SerializeObject(message); _webSocket?.Send(messageText); // 触发MessageSent事件 MessageSent?.Invoke(this, message); _logger.LogDebug($"[{_clientName}] 消息已发送: {messageText}"); } } catch (Exception ex) { _logger.LogError(ex, $"[{_clientName}] 发送消息异常: {ex.Message}"); ConnectionError?.Invoke(this, $"发送消息异常: {ex.Message}"); } } /// /// 清空消息队列 /// private void ClearMessageQueue() { var count = 0; while (_messageFifo.TryTake(out _)) { count++; } if (count > 0) { _logger.LogInformation($"[{_clientName}] 清空消息队列,丢弃 {count} 条消息"); } } #endregion } }