You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
369 lines
15 KiB
369 lines
15 KiB
2 days ago
|
using CoreAgent.Domain.Interfaces.CustomWSClient;
|
||
|
using CoreAgent.Domain.Interfaces.Network;
|
||
|
using CoreAgent.Domain.Models.Protocol;
|
||
|
using Microsoft.Extensions.Logging;
|
||
|
using Newtonsoft.Json.Linq;
|
||
|
using System;
|
||
|
using System.Collections.Concurrent;
|
||
|
using System.Collections.Generic;
|
||
|
using System.Linq;
|
||
|
using System.Text;
|
||
|
using System.Threading;
|
||
|
using System.Threading.Tasks;
|
||
|
|
||
|
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// RAN协议WebSocket消息处理器
|
||
|
/// 负责处理无线接入网(RAN)相关的WebSocket消息,包括配置获取、日志获取等功能
|
||
|
/// </summary>
|
||
|
public class RanLogMessageHandler : ICustomMessageHandler, IDisposable
|
||
|
{
|
||
|
private readonly ILogger _logger;
|
||
|
private int _messageId = 0;
|
||
|
private string _currentMessageId = string.Empty;
|
||
|
private readonly Action<string> _messageCallback;
|
||
|
private readonly ICellularNetworkContext _networkContext;
|
||
|
private readonly BlockingCollection<(string MessageData, IObserverCustomWebSocketClient Observer)> _messageQueue;
|
||
|
private readonly CancellationTokenSource _cancellationTokenSource;
|
||
|
private readonly Task _processTask;
|
||
|
private bool _disposed;
|
||
|
|
||
|
/// <summary>
|
||
|
/// 初始化RAN协议消息处理器
|
||
|
/// </summary>
|
||
|
/// <param name="logger">日志记录器</param>
|
||
|
/// <param name="context">蜂窝网络上下文</param>
|
||
|
/// <param name="action">消息回调处理函数</param>
|
||
|
public RanLogMessageHandler(ILogger logger, ICellularNetworkContext context, Action<string> action)
|
||
|
{
|
||
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||
|
_messageCallback = action ?? throw new ArgumentNullException(nameof(action));
|
||
|
_networkContext = context ?? throw new ArgumentNullException(nameof(context));
|
||
|
|
||
|
_messageQueue = new BlockingCollection<(string, IObserverCustomWebSocketClient)>();
|
||
|
_cancellationTokenSource = new CancellationTokenSource();
|
||
|
_processTask = Task.Run(ProcessMessageQueue);
|
||
|
|
||
|
_logger.LogInformation("RAN协议消息处理器初始化完成,消息队列已启动");
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理接收到的WebSocket消息
|
||
|
/// 将消息加入处理队列,由队列处理器异步处理
|
||
|
/// </summary>
|
||
|
/// <param name="messageData">接收到的消息数据</param>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
public void HandleMessage(string messageData, IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
_logger.LogDebug("将消息加入处理队列: {MessageData}", messageData);
|
||
|
_messageQueue.Add((messageData, observer));
|
||
|
}
|
||
|
catch (Exception ex)
|
||
|
{
|
||
|
_logger.LogError(ex, "将消息加入队列时发生错误: {MessageData}", messageData);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理消息队列中的消息
|
||
|
/// 持续从队列中获取消息并处理,直到队列关闭或取消
|
||
|
/// </summary>
|
||
|
private async Task ProcessMessageQueue()
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
_logger.LogInformation("开始处理消息队列");
|
||
|
foreach (var (messageData, observer) in _messageQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
await ProcessMessageAsync(messageData, observer);
|
||
|
}
|
||
|
catch (Exception ex)
|
||
|
{
|
||
|
_logger.LogError(ex, "处理队列中的消息时发生错误: {MessageData}", messageData);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
catch (OperationCanceledException)
|
||
|
{
|
||
|
_logger.LogInformation("消息队列处理已取消");
|
||
|
}
|
||
|
catch (Exception ex)
|
||
|
{
|
||
|
_logger.LogError(ex, "消息队列处理过程中发生错误");
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理单条消息
|
||
|
/// 解析消息类型并分发到对应的处理方法
|
||
|
/// </summary>
|
||
|
/// <param name="messageData">消息数据</param>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
private async Task ProcessMessageAsync(string messageData, IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
_logger.LogDebug("开始处理RAN协议消息: {MessageData}", messageData);
|
||
|
var jsonData = JObject.Parse(messageData);
|
||
|
string messageType = jsonData["message"]!.ToString();
|
||
|
_logger.LogInformation("收到RAN协议消息类型: {MessageType}", messageType);
|
||
|
await ProcessMessageByTypeAsync(messageType, jsonData, observer);
|
||
|
}
|
||
|
catch (Exception ex)
|
||
|
{
|
||
|
_logger.LogError(ex, "处理RAN协议消息时发生错误: {MessageData}", messageData);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 根据消息类型分发到对应的处理方法
|
||
|
/// </summary>
|
||
|
/// <param name="messageType">消息类型</param>
|
||
|
/// <param name="data">消息数据</param>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
private async Task ProcessMessageByTypeAsync(string messageType, JObject data, IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
_logger.LogDebug("开始处理RAN协议消息类型: {MessageType}", messageType);
|
||
|
switch (messageType)
|
||
|
{
|
||
|
case "ready":
|
||
|
await HandleReadyMessageAsync(observer);
|
||
|
break;
|
||
|
case "config_get":
|
||
|
await HandleConfigGetMessageAsync(data, observer);
|
||
|
break;
|
||
|
case "config_set":
|
||
|
await HandleConfigSetMessageAsync(observer);
|
||
|
break;
|
||
|
case "log_get":
|
||
|
await HandleLogGetMessageAsync(data, observer);
|
||
|
break;
|
||
|
case "stats":
|
||
|
await HandleStatsMessageAsync(observer);
|
||
|
break;
|
||
|
default:
|
||
|
_logger.LogWarning("收到未知的RAN协议消息类型: {MessageType}", messageType);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理ready消息
|
||
|
/// 发送config_get请求,准备获取配置信息
|
||
|
/// </summary>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
private async Task HandleReadyMessageAsync(IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
_messageId++;
|
||
|
string readyResponse = JObject.FromObject(new { message = "config_get", message_id = _messageId }).ToString();
|
||
|
_logger.LogInformation("发送ready响应: {Response}", readyResponse);
|
||
|
await Task.Run(() => observer.SendMessage(readyResponse));
|
||
|
_currentMessageId = _messageId.ToString();
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理config_get消息
|
||
|
/// 发送统计信息和基础层日志配置
|
||
|
/// </summary>
|
||
|
/// <param name="data">消息数据</param>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
private async Task HandleConfigGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
if (_currentMessageId == data["message_id"]!.ToString())
|
||
|
{
|
||
|
_logger.LogInformation("处理config_get请求");
|
||
|
var responseArray = new JArray();
|
||
|
|
||
|
_messageId++;
|
||
|
var statsConfig = new { message = "stats", message_id = _messageId, rf = false, samples = false };
|
||
|
responseArray.Add(JObject.FromObject(statsConfig));
|
||
|
|
||
|
_messageId++;
|
||
|
string baseLayerConfig = ConfigureBaseLayerLogs(data);
|
||
|
responseArray.Add(JObject.Parse(baseLayerConfig));
|
||
|
|
||
|
_logger.LogInformation("发送config_get响应: {Response}", responseArray.ToString());
|
||
|
await Task.Run(() => observer.SendMessage(responseArray.ToString()));
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
_logger.LogWarning("config_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}",
|
||
|
data["message_id"]!.ToString(), _currentMessageId);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理config_set消息
|
||
|
/// 发送层日志配置
|
||
|
/// </summary>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
private async Task HandleConfigSetMessageAsync(IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
_messageId++;
|
||
|
string configResponse = ConfigureLayerLogs(true);
|
||
|
await Task.Run(() => observer.SendMessage(configResponse));
|
||
|
_currentMessageId = _messageId.ToString();
|
||
|
_logger.LogInformation("发送config_set响应: {Response}", configResponse);
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理log_get消息
|
||
|
/// 发送日志配置并触发回调
|
||
|
/// </summary>
|
||
|
/// <param name="data">消息数据</param>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
private async Task HandleLogGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
if (_currentMessageId == data["message_id"]!.ToString())
|
||
|
{
|
||
|
_messageId++;
|
||
|
string logResponse = ConfigureLayerLogs(false);
|
||
|
await Task.Run(() => observer.SendMessage(logResponse));
|
||
|
_currentMessageId = _messageId.ToString();
|
||
|
_logger.LogInformation("发送log_get响应: {Response}", logResponse);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
_logger.LogWarning("log_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}",
|
||
|
data["message_id"]!.ToString(), _currentMessageId);
|
||
|
}
|
||
|
await Task.Run(() => _messageCallback.Invoke(data.ToString()));
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 处理stats消息
|
||
|
/// 发送统计信息请求
|
||
|
/// </summary>
|
||
|
/// <param name="observer">WebSocket客户端观察者</param>
|
||
|
private async Task HandleStatsMessageAsync(IObserverCustomWebSocketClient observer)
|
||
|
{
|
||
|
_messageId++;
|
||
|
string statsResponse = JObject.FromObject(new
|
||
|
{
|
||
|
message = "stats",
|
||
|
message_id = _messageId,
|
||
|
rf = false,
|
||
|
samples = false
|
||
|
}).ToString();
|
||
|
await Task.Run(() => observer.SendMessage(statsResponse));
|
||
|
_logger.LogInformation("发送stats响应: {Response}", statsResponse);
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 配置基础层日志
|
||
|
/// 设置各种日志级别的开关状态
|
||
|
/// </summary>
|
||
|
/// <param name="keyValues">配置键值对</param>
|
||
|
/// <param name="isCloseSystemInfo">是否关闭系统信息</param>
|
||
|
/// <returns>配置后的JSON字符串</returns>
|
||
|
private string ConfigureBaseLayerLogs(JObject keyValues, bool isCloseSystemInfo = false)
|
||
|
{
|
||
|
_logger.LogDebug("开始配置基础层日志: {KeyValues}", keyValues.ToString());
|
||
|
|
||
|
// 移除不需要的配置项
|
||
|
if (keyValues.Remove("rotate") && keyValues.Remove("path") && keyValues.Remove("count"))
|
||
|
{
|
||
|
_logger.LogDebug("已移除rotate、path和count配置项");
|
||
|
}
|
||
|
|
||
|
// 设置系统信息相关配置
|
||
|
keyValues["bcch"] = isCloseSystemInfo;
|
||
|
keyValues["cch"] = isCloseSystemInfo;
|
||
|
keyValues["cell_meas"] = isCloseSystemInfo;
|
||
|
keyValues["csi"] = isCloseSystemInfo;
|
||
|
|
||
|
// 设置其他配置项
|
||
|
keyValues["dci_size"] = false;
|
||
|
keyValues["mib"] = false;
|
||
|
keyValues["rep"] = false;
|
||
|
keyValues["signal"] = false;
|
||
|
|
||
|
var configMessage = new
|
||
|
{
|
||
|
message = "config_set",
|
||
|
logs = keyValues,
|
||
|
message_id = _messageId,
|
||
|
};
|
||
|
|
||
|
string response = JObject.FromObject(configMessage).ToString();
|
||
|
_logger.LogInformation("基础层日志配置完成: {Response}", response);
|
||
|
return response;
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 配置层日志
|
||
|
/// 设置日志超时、计数等参数
|
||
|
/// </summary>
|
||
|
/// <param name="includeHeaders">是否包含头部信息</param>
|
||
|
/// <returns>配置后的JSON字符串</returns>
|
||
|
private string ConfigureLayerLogs(bool includeHeaders = false)
|
||
|
{
|
||
|
_logger.LogDebug("开始配置层日志: IncludeHeaders={IncludeHeaders}", includeHeaders);
|
||
|
|
||
|
var logConfig = new BaseNetworkLog<RanLayerLog>
|
||
|
{
|
||
|
Timeout = includeHeaders ? 0 : 1,
|
||
|
MinLogCount = 64,
|
||
|
MaxLogCount = 2048,
|
||
|
LayerConfig = _networkContext.NetworkLogs.RanLog,
|
||
|
Message = "log_get",
|
||
|
IncludeHeaders = includeHeaders,
|
||
|
MessageId = _messageId,
|
||
|
};
|
||
|
|
||
|
string response = JObject.FromObject(logConfig).ToString();
|
||
|
_logger.LogInformation("层日志配置完成: {Response}", response);
|
||
|
return response;
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 释放资源
|
||
|
/// </summary>
|
||
|
public void Dispose()
|
||
|
{
|
||
|
Dispose(true);
|
||
|
GC.SuppressFinalize(this);
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 释放资源的具体实现
|
||
|
/// </summary>
|
||
|
/// <param name="disposing">是否正在释放托管资源</param>
|
||
|
protected virtual void Dispose(bool disposing)
|
||
|
{
|
||
|
if (!_disposed)
|
||
|
{
|
||
|
if (disposing)
|
||
|
{
|
||
|
_cancellationTokenSource.Cancel();
|
||
|
_messageQueue.CompleteAdding();
|
||
|
try
|
||
|
{
|
||
|
_processTask.Wait(TimeSpan.FromSeconds(5));
|
||
|
}
|
||
|
catch (AggregateException ex)
|
||
|
{
|
||
|
_logger.LogError(ex, "等待消息队列处理完成时发生错误");
|
||
|
}
|
||
|
_messageQueue.Dispose();
|
||
|
_cancellationTokenSource.Dispose();
|
||
|
}
|
||
|
_disposed = true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// 析构函数
|
||
|
/// </summary>
|
||
|
~RanLogMessageHandler()
|
||
|
{
|
||
|
Dispose(false);
|
||
|
}
|
||
|
}
|
||
|
}
|