using CoreAgent.Domain.Interfaces.CustomWSClient; using CoreAgent.Domain.Interfaces.Network; using CoreAgent.Domain.Models.Protocol; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers { /// /// RAN协议WebSocket消息处理器 /// 负责处理无线接入网(RAN)相关的WebSocket消息,包括配置获取、日志获取等功能 /// public class RanLogMessageHandler : ICustomMessageHandler, IDisposable { private readonly ILogger _logger; private int _messageId = 0; private string _currentMessageId = string.Empty; private readonly Action _messageCallback; private readonly ICellularNetworkContext _networkContext; private readonly BlockingCollection<(string MessageData, IObserverCustomWebSocketClient Observer)> _messageQueue; private readonly CancellationTokenSource _cancellationTokenSource; private readonly Task _processTask; private bool _disposed; /// /// 初始化RAN协议消息处理器 /// /// 日志记录器 /// 蜂窝网络上下文 /// 消息回调处理函数 public RanLogMessageHandler(ILogger logger, ICellularNetworkContext context, Action action) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _messageCallback = action ?? throw new ArgumentNullException(nameof(action)); _networkContext = context ?? throw new ArgumentNullException(nameof(context)); _messageQueue = new BlockingCollection<(string, IObserverCustomWebSocketClient)>(); _cancellationTokenSource = new CancellationTokenSource(); _processTask = Task.Run(ProcessMessageQueue); _logger.LogInformation("RAN协议消息处理器初始化完成,消息队列已启动"); } /// /// 处理接收到的WebSocket消息 /// 将消息加入处理队列,由队列处理器异步处理 /// /// 接收到的消息数据 /// WebSocket客户端观察者 public void HandleMessage(string messageData, IObserverCustomWebSocketClient observer) { try { _logger.LogDebug("将消息加入处理队列: {MessageData}", messageData); _messageQueue.Add((messageData, observer)); } catch (Exception ex) { _logger.LogError(ex, "将消息加入队列时发生错误: {MessageData}", messageData); } } /// /// 处理消息队列中的消息 /// 持续从队列中获取消息并处理,直到队列关闭或取消 /// private async Task ProcessMessageQueue() { try { _logger.LogInformation("开始处理消息队列"); foreach (var (messageData, observer) in _messageQueue.GetConsumingEnumerable(_cancellationTokenSource.Token)) { try { await ProcessMessageAsync(messageData, observer); } catch (Exception ex) { _logger.LogError(ex, "处理队列中的消息时发生错误: {MessageData}", messageData); } } } catch (OperationCanceledException) { _logger.LogInformation("消息队列处理已取消"); } catch (Exception ex) { _logger.LogError(ex, "消息队列处理过程中发生错误"); } } /// /// 处理单条消息 /// 解析消息类型并分发到对应的处理方法 /// /// 消息数据 /// WebSocket客户端观察者 private async Task ProcessMessageAsync(string messageData, IObserverCustomWebSocketClient observer) { try { _logger.LogDebug("开始处理RAN协议消息: {MessageData}", messageData); var jsonData = JObject.Parse(messageData); string messageType = jsonData["message"]!.ToString(); _logger.LogInformation("收到RAN协议消息类型: {MessageType}", messageType); await ProcessMessageByTypeAsync(messageType, jsonData, observer); } catch (Exception ex) { _logger.LogError(ex, "处理RAN协议消息时发生错误: {MessageData}", messageData); } } /// /// 根据消息类型分发到对应的处理方法 /// /// 消息类型 /// 消息数据 /// WebSocket客户端观察者 private async Task ProcessMessageByTypeAsync(string messageType, JObject data, IObserverCustomWebSocketClient observer) { _logger.LogDebug("开始处理RAN协议消息类型: {MessageType}", messageType); switch (messageType) { case "ready": await HandleReadyMessageAsync(observer); break; case "config_get": await HandleConfigGetMessageAsync(data, observer); break; case "config_set": await HandleConfigSetMessageAsync(observer); break; case "log_get": await HandleLogGetMessageAsync(data, observer); break; case "stats": await HandleStatsMessageAsync(observer); break; default: _logger.LogWarning("收到未知的RAN协议消息类型: {MessageType}", messageType); break; } } /// /// 处理ready消息 /// 发送config_get请求,准备获取配置信息 /// /// WebSocket客户端观察者 private async Task HandleReadyMessageAsync(IObserverCustomWebSocketClient observer) { _messageId++; string readyResponse = JObject.FromObject(new { message = "config_get", message_id = _messageId }).ToString(); _logger.LogInformation("发送ready响应: {Response}", readyResponse); await Task.Run(() => observer.SendMessage(readyResponse)); _currentMessageId = _messageId.ToString(); } /// /// 处理config_get消息 /// 发送统计信息和基础层日志配置 /// /// 消息数据 /// WebSocket客户端观察者 private async Task HandleConfigGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer) { if (_currentMessageId == data["message_id"]!.ToString()) { _logger.LogInformation("处理config_get请求"); var responseArray = new JArray(); _messageId++; var statsConfig = new { message = "stats", message_id = _messageId, rf = false, samples = false }; responseArray.Add(JObject.FromObject(statsConfig)); _messageId++; string baseLayerConfig = ConfigureBaseLayerLogs(data); responseArray.Add(JObject.Parse(baseLayerConfig)); _logger.LogInformation("发送config_get响应: {Response}", responseArray.ToString()); await Task.Run(() => observer.SendMessage(responseArray.ToString())); } else { _logger.LogWarning("config_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}", data["message_id"]!.ToString(), _currentMessageId); } } /// /// 处理config_set消息 /// 发送层日志配置 /// /// WebSocket客户端观察者 private async Task HandleConfigSetMessageAsync(IObserverCustomWebSocketClient observer) { _messageId++; string configResponse = ConfigureLayerLogs(true); await Task.Run(() => observer.SendMessage(configResponse)); _currentMessageId = _messageId.ToString(); _logger.LogInformation("发送config_set响应: {Response}", configResponse); } /// /// 处理log_get消息 /// 发送日志配置并触发回调 /// /// 消息数据 /// WebSocket客户端观察者 private async Task HandleLogGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer) { if (_currentMessageId == data["message_id"]!.ToString()) { _messageId++; string logResponse = ConfigureLayerLogs(false); await Task.Run(() => observer.SendMessage(logResponse)); _currentMessageId = _messageId.ToString(); _logger.LogInformation("发送log_get响应: {Response}", logResponse); } else { _logger.LogWarning("log_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}", data["message_id"]!.ToString(), _currentMessageId); } await Task.Run(() => _messageCallback.Invoke(data.ToString())); } /// /// 处理stats消息 /// 发送统计信息请求 /// /// WebSocket客户端观察者 private async Task HandleStatsMessageAsync(IObserverCustomWebSocketClient observer) { _messageId++; string statsResponse = JObject.FromObject(new { message = "stats", message_id = _messageId, rf = false, samples = false }).ToString(); await Task.Run(() => observer.SendMessage(statsResponse)); _logger.LogInformation("发送stats响应: {Response}", statsResponse); } /// /// 配置基础层日志 /// 设置各种日志级别的开关状态 /// /// 配置键值对 /// 是否关闭系统信息 /// 配置后的JSON字符串 private string ConfigureBaseLayerLogs(JObject keyValues, bool isCloseSystemInfo = false) { _logger.LogDebug("开始配置基础层日志: {KeyValues}", keyValues.ToString()); // 移除不需要的配置项 if (keyValues.Remove("rotate") && keyValues.Remove("path") && keyValues.Remove("count")) { _logger.LogDebug("已移除rotate、path和count配置项"); } // 设置系统信息相关配置 keyValues["bcch"] = isCloseSystemInfo; keyValues["cch"] = isCloseSystemInfo; keyValues["cell_meas"] = isCloseSystemInfo; keyValues["csi"] = isCloseSystemInfo; // 设置其他配置项 keyValues["dci_size"] = false; keyValues["mib"] = false; keyValues["rep"] = false; keyValues["signal"] = false; var configMessage = new { message = "config_set", logs = keyValues, message_id = _messageId, }; string response = JObject.FromObject(configMessage).ToString(); _logger.LogInformation("基础层日志配置完成: {Response}", response); return response; } /// /// 配置层日志 /// 设置日志超时、计数等参数 /// /// 是否包含头部信息 /// 配置后的JSON字符串 private string ConfigureLayerLogs(bool includeHeaders = false) { _logger.LogDebug("开始配置层日志: IncludeHeaders={IncludeHeaders}", includeHeaders); var logConfig = new BaseNetworkLog { Timeout = includeHeaders ? 0 : 1, MinLogCount = 64, MaxLogCount = 2048, LayerConfig = _networkContext.NetworkLogs.RanLog, Message = "log_get", IncludeHeaders = includeHeaders, MessageId = _messageId, }; string response = JObject.FromObject(logConfig).ToString(); _logger.LogInformation("层日志配置完成: {Response}", response); return response; } /// /// 释放资源 /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// 释放资源的具体实现 /// /// 是否正在释放托管资源 protected virtual void Dispose(bool disposing) { if (!_disposed) { if (disposing) { _cancellationTokenSource.Cancel(); _messageQueue.CompleteAdding(); try { _processTask.Wait(TimeSpan.FromSeconds(5)); } catch (AggregateException ex) { _logger.LogError(ex, "等待消息队列处理完成时发生错误"); } _messageQueue.Dispose(); _cancellationTokenSource.Dispose(); } _disposed = true; } } /// /// 析构函数 /// ~RanLogMessageHandler() { Dispose(false); } } }