You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
223 lines
8.2 KiB
223 lines
8.2 KiB
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using CoreAgent.ProtocolClient.BuildProtocolParser;
|
|
using CoreAgent.ProtocolClient.Context;
|
|
using CoreAgent.ProtocolClient.Managers.WebSocketMgr;
|
|
using CoreAgent.ProtocolClient.Models;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace CoreAgent.ProtocolClient.ProtocolEngineCore
|
|
{
|
|
/// <summary>
|
|
/// 协议日志处理器
|
|
/// 职责:
|
|
/// 1. 管理协议日志队列和消费任务
|
|
/// 2. 协调协议日志解析和数据转换
|
|
/// 3. 提供统一的协议日志处理接口
|
|
/// 4. 处理协议栈各层日志(RRC、NAS、SIP等)
|
|
/// </summary>
|
|
public class ProtocolLogProcessor
|
|
{
|
|
private readonly ProtocolClientConfig _config;
|
|
private readonly ProtocolClientContext _context;
|
|
private readonly ILogger _logger;
|
|
private readonly WebSocketMessageManager _messageManager;
|
|
private readonly BlockingCollection<SourceProtocolLog> _logQueue = new BlockingCollection<SourceProtocolLog>();
|
|
private readonly Task _logConsumerTask;
|
|
private readonly CancellationToken _cancellationToken;
|
|
private readonly LogDetailProcessor _logDetailProcessor;
|
|
private readonly ProtocolContextParser? _contextParser;
|
|
private readonly LogMessageHandler _messageHandler;
|
|
private readonly LogDataConverter _dataConverter;
|
|
private readonly IProtocolLogObserver _protocolLogObserver;
|
|
|
|
public ProtocolLogProcessor(ProtocolClientConfig config, ProtocolClientContext context, ILogger logger, WebSocketMessageManager messageManager, IProtocolLogObserver protocolLogObserver, CancellationToken cancellationToken = default)
|
|
{
|
|
_config = config;
|
|
_context = context;
|
|
_logger = logger;
|
|
_messageManager = messageManager;
|
|
_cancellationToken = cancellationToken;
|
|
_logDetailProcessor = new LogDetailProcessor(_context, logger);
|
|
_contextParser = new ProtocolContextParser(context);
|
|
_messageHandler = new LogMessageHandler(config, context, logger, messageManager);
|
|
_dataConverter = new LogDataConverter(context, logger);
|
|
_protocolLogObserver = protocolLogObserver ?? throw new ArgumentNullException(nameof(protocolLogObserver));
|
|
|
|
// 订阅日志接收事件
|
|
_messageHandler.OnLogReceived += HandleLogReceived;
|
|
|
|
// 启动日志消费任务
|
|
_logConsumerTask = Task.Run(() => ConsumeLogQueue(), _cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 处理从消息处理器接收到的日志
|
|
/// </summary>
|
|
/// <param name="log">接收到的日志</param>
|
|
private void HandleLogReceived(SourceProtocolLog log)
|
|
{
|
|
_logQueue.Add(log);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 持续消费日志队列,支持超时与取消。
|
|
/// </summary>
|
|
private void ConsumeLogQueue()
|
|
{
|
|
while (!_logQueue.IsCompleted && !_cancellationToken.IsCancellationRequested)
|
|
{
|
|
SourceProtocolLog? log;
|
|
if (_logQueue.TryTake(out log, TimeSpan.FromSeconds(1)))
|
|
{
|
|
ProcessLog(log);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 处理单个日志
|
|
/// </summary>
|
|
/// <param name="log">协议日志JSON</param>
|
|
private void ProcessLog(SourceProtocolLog log)
|
|
{
|
|
if (log.Logs == null)
|
|
return;
|
|
|
|
// 1. 统一转换日志明细
|
|
var details = _logDetailProcessor.ConvertLogsToDetails(log.Logs);
|
|
|
|
// 2. 处理 Headers
|
|
if (log.Headers != null && log.Headers.Length > 0)
|
|
{
|
|
_context.CellParameterManager.SetHeaders(log.Headers, _context.BasicInfo);
|
|
}
|
|
|
|
// 3. 日志明细转换与解析
|
|
var logList = _logDetailProcessor.ConvertDetailsToProtocolLogs(details);
|
|
var parsedLogList = ParseLogList(logList);
|
|
|
|
// 4. 转换为TransferProtocolLog并处理
|
|
var logDetails = _dataConverter.ConvertToProtocolLogDetails(parsedLogList);
|
|
ProcessLogDetails(logDetails);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 解析日志列表
|
|
/// </summary>
|
|
/// <param name="logList">日志列表</param>
|
|
/// <returns>解析后的日志列表</returns>
|
|
private List<BuildProtocolLog> ParseLogList(IEnumerable<BuildProtocolLog> logList)
|
|
{
|
|
if (_contextParser == null) return logList.ToList();
|
|
|
|
var logs = logList as List<BuildProtocolLog> ?? logList.ToList();
|
|
var parsedLogs = new List<BuildProtocolLog>();
|
|
|
|
for (int i = 0; i < logs.Count; i++)
|
|
{
|
|
try
|
|
{
|
|
var log = logs[i];
|
|
log.Message = log.Message + "";
|
|
var parser = _contextParser.GetParserByKey(log.Layer);
|
|
parser.GeneralParse(ref log);
|
|
parsedLogs.Add(log);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, $"日志解析异常,Layer: {logs[i].Layer}, Message: {logs[i].Message}");
|
|
// 解析失败时,添加原始日志
|
|
parsedLogs.Add(logs[i]);
|
|
}
|
|
}
|
|
|
|
return parsedLogs;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 处理转换后的日志详情
|
|
/// </summary>
|
|
/// <param name="logDetails">日志详情列表</param>
|
|
private void ProcessLogDetails(List<TransferProtocolLog> logDetails)
|
|
{
|
|
foreach (var detail in logDetails)
|
|
{
|
|
try
|
|
{
|
|
Console.WriteLine($"处理日志详情:Time={detail.Time} Layer={detail.LayerType}, UEID={detail.UEID}, IMSI={detail.IMSI},PLMN={detail.PLMN},INFO={detail.Info},Messsage={detail.Message}");
|
|
// 这里可以添加具体的业务处理逻辑
|
|
// 例如:保存到数据库、发送到其他服务等
|
|
_logger.LogDebug($"处理日志详情: Layer={detail.LayerType}, UEID={detail.UEID}, IMSI={detail.IMSI}");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, $"处理日志详情失败: Layer={detail.LayerType}, UEID={detail.UEID}");
|
|
}
|
|
}
|
|
|
|
// 通知协议日志观察者处理转换后的数据
|
|
try
|
|
{
|
|
_protocolLogObserver.OnProtocolLogsReceived(logDetails);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "通知协议日志观察者失败");
|
|
}
|
|
}
|
|
|
|
#region 公共接口 - 委托给消息处理器
|
|
|
|
/// <summary>
|
|
/// 处理接收到的WebSocket消息
|
|
/// </summary>
|
|
/// <param name="message">接收到的消息</param>
|
|
public void HandleMessage(Newtonsoft.Json.Linq.JObject message)
|
|
{
|
|
_messageHandler.HandleMessage(message);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发起日志获取请求
|
|
/// </summary>
|
|
/// <param name="parameters">请求参数</param>
|
|
public void GetLogs(Dictionary<string, object>? parameters = null)
|
|
{
|
|
_messageHandler.GetLogs(parameters);
|
|
}
|
|
|
|
/// <summary>
|
|
/// 重置日志
|
|
/// </summary>
|
|
public void ResetLogs()
|
|
{
|
|
_messageHandler.ResetLogs();
|
|
}
|
|
|
|
/// <summary>
|
|
/// 设置日志配置
|
|
/// </summary>
|
|
/// <param name="logsConfig">日志配置</param>
|
|
/// <param name="save">是否保存配置</param>
|
|
public void SetLogsConfig(ProtocolClientLogsConfig logsConfig, bool save = false)
|
|
{
|
|
_messageHandler.SetLogsConfig(logsConfig, save);
|
|
}
|
|
|
|
#endregion
|
|
|
|
/// <summary>
|
|
/// 停止日志消费任务,安全关闭队列。
|
|
/// </summary>
|
|
public void Stop()
|
|
{
|
|
_logQueue.CompleteAdding();
|
|
_messageHandler.OnLogReceived -= HandleLogReceived;
|
|
}
|
|
}
|
|
}
|
|
|