using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using CoreAgent.ProtocolClient.BuildProtocolParser;
using CoreAgent.ProtocolClient.Context;
using CoreAgent.ProtocolClient.Managers.WebSocketMgr;
using CoreAgent.ProtocolClient.Models;
using Microsoft.Extensions.Logging;
namespace CoreAgent.ProtocolClient.ProtocolEngineCore
{
///
/// 协议日志处理器
/// 职责:
/// 1. 管理协议日志队列和消费任务
/// 2. 协调协议日志解析和数据转换
/// 3. 提供统一的协议日志处理接口
/// 4. 处理协议栈各层日志(RRC、NAS、SIP等)
///
public class ProtocolLogProcessor
{
private readonly ProtocolClientConfig _config;
private readonly ProtocolClientContext _context;
private readonly ILogger _logger;
private readonly WebSocketMessageManager _messageManager;
private readonly BlockingCollection _logQueue = new BlockingCollection();
private readonly Task _logConsumerTask;
private readonly CancellationToken _cancellationToken;
private readonly LogDetailProcessor _logDetailProcessor;
private readonly ProtocolContextParser? _contextParser;
private readonly LogMessageHandler _messageHandler;
private readonly LogDataConverter _dataConverter;
private readonly IProtocolLogObserver _protocolLogObserver;
public ProtocolLogProcessor(ProtocolClientConfig config, ProtocolClientContext context, ILogger logger, WebSocketMessageManager messageManager, IProtocolLogObserver protocolLogObserver, CancellationToken cancellationToken = default)
{
_config = config;
_context = context;
_logger = logger;
_messageManager = messageManager;
_cancellationToken = cancellationToken;
_logDetailProcessor = new LogDetailProcessor(_context, logger);
_contextParser = new ProtocolContextParser(context);
_messageHandler = new LogMessageHandler(config, context, logger, messageManager);
_dataConverter = new LogDataConverter(context, logger);
_protocolLogObserver = protocolLogObserver ?? throw new ArgumentNullException(nameof(protocolLogObserver));
// 订阅日志接收事件
_messageHandler.OnLogReceived += HandleLogReceived;
// 启动日志消费任务
_logConsumerTask = Task.Run(() => ConsumeLogQueue(), _cancellationToken);
}
///
/// 处理从消息处理器接收到的日志
///
/// 接收到的日志
private void HandleLogReceived(SourceProtocolLog log)
{
_logQueue.Add(log);
}
///
/// 持续消费日志队列,支持超时与取消。
///
private void ConsumeLogQueue()
{
while (!_logQueue.IsCompleted && !_cancellationToken.IsCancellationRequested)
{
SourceProtocolLog? log;
if (_logQueue.TryTake(out log, TimeSpan.FromSeconds(1)))
{
ProcessLog(log);
}
}
}
///
/// 处理单个日志
///
/// 协议日志JSON
private void ProcessLog(SourceProtocolLog log)
{
if (log.Logs == null)
return;
// 1. 统一转换日志明细
var details = _logDetailProcessor.ConvertLogsToDetails(log.Logs);
// 2. 处理 Headers
if (log.Headers != null && log.Headers.Length > 0)
{
_context.CellParameterManager.SetHeaders(log.Headers, _context.BasicInfo);
}
// 3. 日志明细转换与解析
var logList = _logDetailProcessor.ConvertDetailsToProtocolLogs(details);
var parsedLogList = ParseLogList(logList);
// 4. 转换为TransferProtocolLog并处理
var logDetails = _dataConverter.ConvertToProtocolLogDetails(parsedLogList);
ProcessLogDetails(logDetails);
}
///
/// 解析日志列表
///
/// 日志列表
/// 解析后的日志列表
private List ParseLogList(IEnumerable logList)
{
if (_contextParser == null) return logList.ToList();
var logs = logList as List ?? logList.ToList();
var parsedLogs = new List();
for (int i = 0; i < logs.Count; i++)
{
try
{
var log = logs[i];
log.Message = log.Message + "";
var parser = _contextParser.GetParserByKey(log.Layer);
parser.GeneralParse(ref log);
parsedLogs.Add(log);
}
catch (Exception ex)
{
_logger.LogError(ex, $"日志解析异常,Layer: {logs[i].Layer}, Message: {logs[i].Message}");
// 解析失败时,添加原始日志
parsedLogs.Add(logs[i]);
}
}
return parsedLogs;
}
///
/// 处理转换后的日志详情
///
/// 日志详情列表
private void ProcessLogDetails(List logDetails)
{
foreach (var detail in logDetails)
{
try
{
Console.WriteLine($"处理日志详情:Time={detail.Time} Layer={detail.LayerType}, UEID={detail.UEID}, IMSI={detail.IMSI},PLMN={detail.PLMN},INFO={detail.Info},Messsage={detail.Message}");
// 这里可以添加具体的业务处理逻辑
// 例如:保存到数据库、发送到其他服务等
_logger.LogDebug($"处理日志详情: Layer={detail.LayerType}, UEID={detail.UEID}, IMSI={detail.IMSI}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理日志详情失败: Layer={detail.LayerType}, UEID={detail.UEID}");
}
}
// 通知协议日志观察者处理转换后的数据
try
{
// 过滤掉Info为空或无效的记录
var filteredLogDetails = logDetails.Where(detail =>
!string.IsNullOrWhiteSpace(detail.Info)
).ToList();
if (filteredLogDetails.Any())
{
_protocolLogObserver.OnProtocolLogsReceived(filteredLogDetails);
}
else
{
_logger.LogDebug("过滤后没有有效的日志记录,跳过观察者通知");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "通知协议日志观察者失败");
}
}
#region 公共接口 - 委托给消息处理器
///
/// 处理接收到的WebSocket消息
///
/// 接收到的消息
public void HandleMessage(Newtonsoft.Json.Linq.JObject message)
{
_messageHandler.HandleMessage(message);
}
///
/// 发起日志获取请求
///
/// 请求参数
public void GetLogs(Dictionary? parameters = null)
{
_messageHandler.GetLogs(parameters);
}
///
/// 重置日志
///
public void ResetLogs()
{
_messageHandler.ResetLogs();
}
///
/// 设置日志配置
///
/// 日志配置
/// 是否保存配置
public void SetLogsConfig(ProtocolClientLogsConfig logsConfig, bool save = false)
{
_messageHandler.SetLogsConfig(logsConfig, save);
}
#endregion
///
/// 停止日志消费任务,安全关闭队列。
///
public void Stop()
{
_logQueue.CompleteAdding();
_messageHandler.OnLogReceived -= HandleLogReceived;
}
}
}