You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
8.7 KiB

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using CoreAgent.ProtocolClient.BuildProtocolParser;
using CoreAgent.ProtocolClient.Context;
using CoreAgent.ProtocolClient.Managers.WebSocketMgr;
using CoreAgent.ProtocolClient.Models;
using Microsoft.Extensions.Logging;
namespace CoreAgent.ProtocolClient.ProtocolEngineCore
{
/// <summary>
/// 协议日志处理器
/// 职责:
/// 1. 管理协议日志队列和消费任务
/// 2. 协调协议日志解析和数据转换
/// 3. 提供统一的协议日志处理接口
/// 4. 处理协议栈各层日志(RRC、NAS、SIP等)
/// </summary>
public class ProtocolLogProcessor
{
private readonly ProtocolClientConfig _config;
private readonly ProtocolClientContext _context;
private readonly ILogger _logger;
private readonly WebSocketMessageManager _messageManager;
private readonly BlockingCollection<SourceProtocolLog> _logQueue = new BlockingCollection<SourceProtocolLog>();
private readonly Task _logConsumerTask;
private readonly CancellationToken _cancellationToken;
private readonly LogDetailProcessor _logDetailProcessor;
private readonly ProtocolContextParser? _contextParser;
private readonly LogMessageHandler _messageHandler;
private readonly LogDataConverter _dataConverter;
private readonly IProtocolLogObserver _protocolLogObserver;
public ProtocolLogProcessor(ProtocolClientConfig config, ProtocolClientContext context, ILogger logger, WebSocketMessageManager messageManager, IProtocolLogObserver protocolLogObserver, CancellationToken cancellationToken = default)
{
_config = config;
_context = context;
_logger = logger;
_messageManager = messageManager;
_cancellationToken = cancellationToken;
_logDetailProcessor = new LogDetailProcessor(_context, logger);
_contextParser = new ProtocolContextParser(context);
_messageHandler = new LogMessageHandler(config, context, logger, messageManager);
_dataConverter = new LogDataConverter(context, config, logger);
_protocolLogObserver = protocolLogObserver ?? throw new ArgumentNullException(nameof(protocolLogObserver));
// 订阅日志接收事件
_messageHandler.OnLogReceived += HandleLogReceived;
// 启动日志消费任务
_logConsumerTask = Task.Run(() => ConsumeLogQueue(), _cancellationToken);
}
/// <summary>
/// 处理从消息处理器接收到的日志
/// </summary>
/// <param name="log">接收到的日志</param>
private void HandleLogReceived(SourceProtocolLog log)
{
_logQueue.Add(log);
}
/// <summary>
/// 持续消费日志队列,支持超时与取消。
/// </summary>
private void ConsumeLogQueue()
{
while (!_logQueue.IsCompleted && !_cancellationToken.IsCancellationRequested)
{
SourceProtocolLog? log;
if (_logQueue.TryTake(out log, TimeSpan.FromSeconds(1)))
{
ProcessLog(log);
}
}
}
/// <summary>
/// 处理单个日志
/// </summary>
/// <param name="log">协议日志JSON</param>
private void ProcessLog(SourceProtocolLog log)
{
if (log.Logs == null)
return;
// 1. 统一转换日志明细
var details = _logDetailProcessor.ConvertLogsToDetails(log.Logs);
// 2. 处理 Headers
if (log.Headers != null && log.Headers.Length > 0)
{
_context.CellParameterManager.SetHeaders(log.Headers, _context.BasicInfo);
}
// 3. 日志明细转换与解析
var logList = _logDetailProcessor.ConvertDetailsToProtocolLogs(details);
var parsedLogList = ParseLogList(logList);
// 4. 转换为TransferProtocolLog并处理
var logDetails = _dataConverter.ConvertToProtocolLogDetails(parsedLogList);
ProcessLogDetails(logDetails);
}
/// <summary>
/// 解析日志列表
/// </summary>
/// <param name="logList">日志列表</param>
/// <returns>解析后的日志列表</returns>
private List<BuildProtocolLog> ParseLogList(IEnumerable<BuildProtocolLog> logList)
{
if (_contextParser == null) return logList.ToList();
var logs = logList as List<BuildProtocolLog> ?? logList.ToList();
var parsedLogs = new List<BuildProtocolLog>();
for (int i = 0; i < logs.Count; i++)
{
try
{
var log = logs[i];
log.Message = log.Message + "";
var parser = _contextParser.GetParserByKey(log.Layer);
parser.GeneralParse(ref log);
parsedLogs.Add(log);
}
catch (Exception ex)
{
_logger.LogError(ex, $"日志解析异常,Layer: {logs[i].Layer}, Message: {logs[i].Message}");
// 解析失败时,添加原始日志
parsedLogs.Add(logs[i]);
}
}
return parsedLogs;
}
/// <summary>
/// 处理转换后的日志详情
/// </summary>
/// <param name="logDetails">日志详情列表</param>
private void ProcessLogDetails(List<TransferProtocolLog> logDetails)
{
foreach (var detail in logDetails)
{
try
{
Console.WriteLine($"处理日志详情:Time={detail.Time} Layer={detail.LayerType}, UEID={detail.UEID}, IMSI={detail.IMSI},PLMN={detail.PLMN},INFO={detail.Info},Messsage={detail.Message}");
// 这里可以添加具体的业务处理逻辑
// 例如:保存到数据库、发送到其他服务等
_logger.LogDebug($"处理日志详情: Layer={detail.LayerType}, UEID={detail.UEID}, IMSI={detail.IMSI}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理日志详情失败: Layer={detail.LayerType}, UEID={detail.UEID}");
}
}
// 通知协议日志观察者处理转换后的数据
try
{
// 过滤掉Info为空或无效的记录
var filteredLogDetails = logDetails.Where(detail =>
!string.IsNullOrWhiteSpace(detail.Info)
).ToList();
if (filteredLogDetails.Any())
{
_protocolLogObserver.OnProtocolLogsReceived(filteredLogDetails);
}
else
{
_logger.LogDebug("过滤后没有有效的日志记录,跳过观察者通知");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "通知协议日志观察者失败");
}
}
#region 公共接口 - 委托给消息处理器
/// <summary>
/// 处理接收到的WebSocket消息
/// </summary>
/// <param name="message">接收到的消息</param>
public void HandleMessage(Newtonsoft.Json.Linq.JObject message)
{
_messageHandler.HandleMessage(message);
}
/// <summary>
/// 发起日志获取请求
/// </summary>
/// <param name="parameters">请求参数</param>
public void GetLogs(Dictionary<string, object>? parameters = null)
{
_messageHandler.GetLogs(parameters);
}
/// <summary>
/// 重置日志
/// </summary>
public void ResetLogs()
{
_messageHandler.ResetLogs();
}
/// <summary>
/// 设置日志配置
/// </summary>
/// <param name="logsConfig">日志配置</param>
/// <param name="save">是否保存配置</param>
public void SetLogsConfig(ProtocolClientLogsConfig logsConfig, bool save = false)
{
_messageHandler.SetLogsConfig(logsConfig, save);
}
#endregion
/// <summary>
/// 停止日志消费任务,安全关闭队列。
/// </summary>
public void Stop()
{
_logQueue.CompleteAdding();
_messageHandler.OnLogReceived -= HandleLogReceived;
}
}
}