using CoreAgent.Domain.Interfaces.CustomWSClient;
using CoreAgent.Domain.Interfaces.Network;
using CoreAgent.Domain.Models.Protocol;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers
{
///
/// RAN协议WebSocket消息处理器
/// 负责处理无线接入网(RAN)相关的WebSocket消息,包括配置获取、日志获取等功能
///
public class RanLogMessageHandler : ICustomMessageHandler, IDisposable
{
private readonly ILogger _logger;
private int _messageId = 0;
private string _currentMessageId = string.Empty;
private readonly Action _messageCallback;
private readonly ICellularNetworkContext _networkContext;
private readonly BlockingCollection<(string MessageData, IObserverCustomWebSocketClient Observer)> _messageQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _processTask;
private bool _disposed;
///
/// 初始化RAN协议消息处理器
///
/// 日志记录器
/// 蜂窝网络上下文
/// 消息回调处理函数
public RanLogMessageHandler(ILogger logger, ICellularNetworkContext context, Action action)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_messageCallback = action ?? throw new ArgumentNullException(nameof(action));
_networkContext = context ?? throw new ArgumentNullException(nameof(context));
_messageQueue = new BlockingCollection<(string, IObserverCustomWebSocketClient)>();
_cancellationTokenSource = new CancellationTokenSource();
_processTask = Task.Run(ProcessMessageQueue);
_logger.LogInformation("RAN协议消息处理器初始化完成,消息队列已启动");
}
///
/// 处理接收到的WebSocket消息
/// 将消息加入处理队列,由队列处理器异步处理
///
/// 接收到的消息数据
/// WebSocket客户端观察者
public void HandleMessage(string messageData, IObserverCustomWebSocketClient observer)
{
try
{
_logger.LogDebug("将消息加入处理队列: {MessageData}", messageData);
_messageQueue.Add((messageData, observer));
}
catch (Exception ex)
{
_logger.LogError(ex, "将消息加入队列时发生错误: {MessageData}", messageData);
}
}
///
/// 处理消息队列中的消息
/// 持续从队列中获取消息并处理,直到队列关闭或取消
///
private async Task ProcessMessageQueue()
{
try
{
_logger.LogInformation("开始处理消息队列");
foreach (var (messageData, observer) in _messageQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
{
try
{
await ProcessMessageAsync(messageData, observer);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理队列中的消息时发生错误: {MessageData}", messageData);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("消息队列处理已取消");
}
catch (Exception ex)
{
_logger.LogError(ex, "消息队列处理过程中发生错误");
}
}
///
/// 处理单条消息
/// 解析消息类型并分发到对应的处理方法
///
/// 消息数据
/// WebSocket客户端观察者
private async Task ProcessMessageAsync(string messageData, IObserverCustomWebSocketClient observer)
{
try
{
_logger.LogDebug("开始处理RAN协议消息: {MessageData}", messageData);
var jsonData = JObject.Parse(messageData);
string messageType = jsonData["message"]!.ToString();
_logger.LogInformation("收到RAN协议消息类型: {MessageType}", messageType);
await ProcessMessageByTypeAsync(messageType, jsonData, observer);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理RAN协议消息时发生错误: {MessageData}", messageData);
}
}
///
/// 根据消息类型分发到对应的处理方法
///
/// 消息类型
/// 消息数据
/// WebSocket客户端观察者
private async Task ProcessMessageByTypeAsync(string messageType, JObject data, IObserverCustomWebSocketClient observer)
{
_logger.LogDebug("开始处理RAN协议消息类型: {MessageType}", messageType);
switch (messageType)
{
case "ready":
await HandleReadyMessageAsync(observer);
break;
case "config_get":
await HandleConfigGetMessageAsync(data, observer);
break;
case "config_set":
await HandleConfigSetMessageAsync(observer);
break;
case "log_get":
await HandleLogGetMessageAsync(data, observer);
break;
case "stats":
await HandleStatsMessageAsync(observer);
break;
default:
_logger.LogWarning("收到未知的RAN协议消息类型: {MessageType}", messageType);
break;
}
}
///
/// 处理ready消息
/// 发送config_get请求,准备获取配置信息
///
/// WebSocket客户端观察者
private async Task HandleReadyMessageAsync(IObserverCustomWebSocketClient observer)
{
_messageId++;
string readyResponse = JObject.FromObject(new { message = "config_get", message_id = _messageId }).ToString();
_logger.LogInformation("发送ready响应: {Response}", readyResponse);
await Task.Run(() => observer.SendMessage(readyResponse));
_currentMessageId = _messageId.ToString();
}
///
/// 处理config_get消息
/// 发送统计信息和基础层日志配置
///
/// 消息数据
/// WebSocket客户端观察者
private async Task HandleConfigGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer)
{
if (_currentMessageId == data["message_id"]!.ToString())
{
_logger.LogInformation("处理config_get请求");
var responseArray = new JArray();
_messageId++;
var statsConfig = new { message = "stats", message_id = _messageId, rf = false, samples = false };
responseArray.Add(JObject.FromObject(statsConfig));
_messageId++;
string baseLayerConfig = ConfigureBaseLayerLogs(data);
responseArray.Add(JObject.Parse(baseLayerConfig));
_logger.LogInformation("发送config_get响应: {Response}", responseArray.ToString());
await Task.Run(() => observer.SendMessage(responseArray.ToString()));
}
else
{
_logger.LogWarning("config_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}",
data["message_id"]!.ToString(), _currentMessageId);
}
}
///
/// 处理config_set消息
/// 发送层日志配置
///
/// WebSocket客户端观察者
private async Task HandleConfigSetMessageAsync(IObserverCustomWebSocketClient observer)
{
_messageId++;
string configResponse = ConfigureLayerLogs(true);
await Task.Run(() => observer.SendMessage(configResponse));
_currentMessageId = _messageId.ToString();
_logger.LogInformation("发送config_set响应: {Response}", configResponse);
}
///
/// 处理log_get消息
/// 发送日志配置并触发回调
///
/// 消息数据
/// WebSocket客户端观察者
private async Task HandleLogGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer)
{
if (_currentMessageId == data["message_id"]!.ToString())
{
_messageId++;
string logResponse = ConfigureLayerLogs(false);
await Task.Run(() => observer.SendMessage(logResponse));
_currentMessageId = _messageId.ToString();
_logger.LogInformation("发送log_get响应: {Response}", logResponse);
}
else
{
_logger.LogWarning("log_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}",
data["message_id"]!.ToString(), _currentMessageId);
}
await Task.Run(() => _messageCallback.Invoke(data.ToString()));
}
///
/// 处理stats消息
/// 发送统计信息请求
///
/// WebSocket客户端观察者
private async Task HandleStatsMessageAsync(IObserverCustomWebSocketClient observer)
{
_messageId++;
string statsResponse = JObject.FromObject(new
{
message = "stats",
message_id = _messageId,
rf = false,
samples = false
}).ToString();
await Task.Run(() => observer.SendMessage(statsResponse));
_logger.LogInformation("发送stats响应: {Response}", statsResponse);
}
///
/// 配置基础层日志
/// 设置各种日志级别的开关状态
///
/// 配置键值对
/// 是否关闭系统信息
/// 配置后的JSON字符串
private string ConfigureBaseLayerLogs(JObject keyValues, bool isCloseSystemInfo = false)
{
_logger.LogDebug("开始配置基础层日志: {KeyValues}", keyValues.ToString());
// 移除不需要的配置项
if (keyValues.Remove("rotate") && keyValues.Remove("path") && keyValues.Remove("count"))
{
_logger.LogDebug("已移除rotate、path和count配置项");
}
// 设置系统信息相关配置
keyValues["bcch"] = isCloseSystemInfo;
keyValues["cch"] = isCloseSystemInfo;
keyValues["cell_meas"] = isCloseSystemInfo;
keyValues["csi"] = isCloseSystemInfo;
// 设置其他配置项
keyValues["dci_size"] = false;
keyValues["mib"] = false;
keyValues["rep"] = false;
keyValues["signal"] = false;
var configMessage = new
{
message = "config_set",
logs = keyValues,
message_id = _messageId,
};
string response = JObject.FromObject(configMessage).ToString();
_logger.LogInformation("基础层日志配置完成: {Response}", response);
return response;
}
///
/// 配置层日志
/// 设置日志超时、计数等参数
///
/// 是否包含头部信息
/// 配置后的JSON字符串
private string ConfigureLayerLogs(bool includeHeaders = false)
{
_logger.LogDebug("开始配置层日志: IncludeHeaders={IncludeHeaders}", includeHeaders);
var logConfig = new BaseNetworkLog
{
Timeout = includeHeaders ? 0 : 1,
MinLogCount = 64,
MaxLogCount = 2048,
LayerConfig = _networkContext.NetworkLogs.RanLog,
Message = "log_get",
IncludeHeaders = includeHeaders,
MessageId = _messageId,
};
string response = JObject.FromObject(logConfig).ToString();
_logger.LogInformation("层日志配置完成: {Response}", response);
return response;
}
///
/// 释放资源
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// 释放资源的具体实现
///
/// 是否正在释放托管资源
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_cancellationTokenSource.Cancel();
_messageQueue.CompleteAdding();
try
{
_processTask.Wait(TimeSpan.FromSeconds(5));
}
catch (AggregateException ex)
{
_logger.LogError(ex, "等待消息队列处理完成时发生错误");
}
_messageQueue.Dispose();
_cancellationTokenSource.Dispose();
}
_disposed = true;
}
}
///
/// 析构函数
///
~RanLogMessageHandler()
{
Dispose(false);
}
}
}