Compare commits
1 Commits
master
...
feature/pr
Author | SHA1 | Date |
---|---|---|
|
89eda9ec4e | 1 day ago |
26 changed files with 2141 additions and 0 deletions
@ -0,0 +1,48 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Domain.Helpers |
|||
{ |
|||
public static class JsonHelper |
|||
{ |
|||
/// <summary>
|
|||
/// 对象序列化
|
|||
/// </summary>
|
|||
/// <param name="obj">对象</param>
|
|||
/// <param name="isUseTextJson">是否使用textjson</param>
|
|||
/// <returns>返回json字符串</returns>
|
|||
public static string ObjToJson(this object obj, bool isUseTextJson = false) |
|||
{ |
|||
if (isUseTextJson) |
|||
{ |
|||
return System.Text.Json.JsonSerializer.Serialize(obj); |
|||
} |
|||
else |
|||
{ |
|||
return Newtonsoft.Json.JsonConvert.SerializeObject(obj); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// json反序列化obj
|
|||
/// </summary>
|
|||
/// <typeparam name="T">反序列类型</typeparam>
|
|||
/// <param name="strJson">json</param>
|
|||
/// <param name="isUseTextJson">是否使用textjson</param>
|
|||
/// <returns>返回对象</returns>
|
|||
public static T JsonToObj<T>(this string strJson, bool isUseTextJson = false) |
|||
{ |
|||
if (isUseTextJson) |
|||
{ |
|||
return System.Text.Json.JsonSerializer.Deserialize<T>(strJson); |
|||
} |
|||
else |
|||
{ |
|||
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(strJson); |
|||
} |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,13 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Domain.Interfaces.CustomWSClient |
|||
{ |
|||
public interface ICustomMessageHandler |
|||
{ |
|||
void HandleMessage(string MsgData, IObserverCustomWebSocketClient observer); |
|||
} |
|||
} |
@ -0,0 +1,13 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Domain.Interfaces.CustomWSClient |
|||
{ |
|||
public interface IObserverCustomWebSocketClient |
|||
{ |
|||
public void SendMessage(string message); |
|||
} |
|||
} |
@ -0,0 +1,13 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Domain.Interfaces.ProtocolLogHandlers |
|||
{ |
|||
public interface IProtocolLogsProviderObserver |
|||
{ |
|||
void OnData(string msg); |
|||
} |
|||
} |
@ -0,0 +1,15 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Domain.Interfaces.ProtocolLogHandlers |
|||
{ |
|||
public interface IProtocollHandleLogs |
|||
{ |
|||
public void RunStart(); |
|||
|
|||
public void RunStop(); |
|||
} |
|||
} |
@ -0,0 +1,55 @@ |
|||
using Newtonsoft.Json; |
|||
using System; |
|||
using System.Text.Json.Serialization; |
|||
|
|||
namespace CoreAgent.Domain.Models.Protocol; |
|||
|
|||
/// <summary>
|
|||
/// 基础网络日志实体类
|
|||
/// 提供网络日志的通用属性和配置
|
|||
/// </summary>
|
|||
/// <typeparam name="T">日志层类型(如ImsLayerLog或RanLayerLog)</typeparam>
|
|||
public class BaseNetworkLog<T> |
|||
{ |
|||
/// <summary>
|
|||
/// 超时时间(毫秒)
|
|||
/// </summary>
|
|||
[JsonProperty("timeout")] |
|||
public int Timeout { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 最小日志数量
|
|||
/// </summary>
|
|||
[JsonProperty("min")] |
|||
public int MinLogCount { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 最大日志数量
|
|||
/// </summary>
|
|||
[JsonProperty("max")] |
|||
public int MaxLogCount { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 日志层配置
|
|||
/// </summary>
|
|||
[JsonProperty("layers")] |
|||
public T LayerConfig { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 日志消息
|
|||
/// </summary>
|
|||
[JsonProperty("message")] |
|||
public string Message { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 是否包含消息头
|
|||
/// </summary>
|
|||
[JsonProperty("headers")] |
|||
public bool IncludeHeaders { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 消息ID
|
|||
/// </summary>
|
|||
[JsonProperty("message_id")] |
|||
public int MessageId { get; set; } |
|||
} |
@ -0,0 +1,80 @@ |
|||
using System; |
|||
using System.Text.Json.Serialization; |
|||
|
|||
namespace CoreAgent.Domain.Models.Protocol; |
|||
|
|||
/// <summary>
|
|||
/// IMS层日志实体类
|
|||
/// 该实体用于记录IMS(IP多媒体子系统)相关的各层日志信息
|
|||
/// 遵循DDD(领域驱动设计)原则,作为领域模型的一部分
|
|||
/// </summary>
|
|||
public class ImsLayerLog |
|||
{ |
|||
/// <summary>
|
|||
/// CX协议层日志级别
|
|||
/// </summary>
|
|||
public string CX { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// IMS协议层日志级别
|
|||
/// </summary>
|
|||
public string IMS { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// IPSEC协议层日志级别
|
|||
/// </summary>
|
|||
public string IPSEC { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// MEDIA协议层日志级别
|
|||
/// </summary>
|
|||
public string MEDIA { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// MMS协议层日志级别
|
|||
/// </summary>
|
|||
public string MMS { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// RX协议层日志级别
|
|||
/// </summary>
|
|||
public string RX { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// SIP协议层日志级别
|
|||
/// </summary>
|
|||
public string SIP { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 初始化IMS层日志级别
|
|||
/// </summary>
|
|||
public void InitializeLogLevels() |
|||
{ |
|||
CX = LogLevel.Warn.ToString().ToLower(); |
|||
IMS = LogLevel.Warn.ToString().ToLower(); |
|||
IPSEC = LogLevel.Warn.ToString().ToLower(); |
|||
MEDIA = LogLevel.Warn.ToString().ToLower(); |
|||
MMS = LogLevel.Warn.ToString().ToLower(); |
|||
RX = LogLevel.Warn.ToString().ToLower(); |
|||
SIP = LogLevel.Debug.ToString().ToLower(); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 更新指定层的日志级别
|
|||
/// </summary>
|
|||
/// <param name="layerName">层名称</param>
|
|||
/// <param name="logLevel">日志级别</param>
|
|||
/// <returns>是否更新成功</returns>
|
|||
public bool UpdateLogLevel(string layerName, LogLevel logLevel) |
|||
{ |
|||
if (string.IsNullOrEmpty(layerName)) |
|||
return false; |
|||
|
|||
var property = GetType().GetProperty(layerName); |
|||
if (property == null) |
|||
return false; |
|||
|
|||
property.SetValue(this, logLevel.ToString().ToLower()); |
|||
return true; |
|||
} |
|||
} |
@ -0,0 +1,27 @@ |
|||
namespace CoreAgent.Domain.Models.Protocol; |
|||
|
|||
/// <summary>
|
|||
/// 日志级别枚举
|
|||
/// </summary>
|
|||
public enum LogLevel |
|||
{ |
|||
/// <summary>
|
|||
/// 调试级别
|
|||
/// </summary>
|
|||
Debug, |
|||
|
|||
/// <summary>
|
|||
/// 信息级别
|
|||
/// </summary>
|
|||
Info, |
|||
|
|||
/// <summary>
|
|||
/// 警告级别
|
|||
/// </summary>
|
|||
Warn, |
|||
|
|||
/// <summary>
|
|||
/// 错误级别
|
|||
/// </summary>
|
|||
Error |
|||
} |
@ -0,0 +1,54 @@ |
|||
using System; |
|||
using System.Text.Json.Serialization; |
|||
|
|||
namespace CoreAgent.Domain.Models.Protocol; |
|||
|
|||
/// <summary>
|
|||
/// 网络层日志集合
|
|||
/// 用于统一管理不同网络层的日志配置
|
|||
/// </summary>
|
|||
public class NetworkLayerLogs |
|||
{ |
|||
/// <summary>
|
|||
/// IMS层日志配置
|
|||
/// </summary>
|
|||
public ImsLayerLog ImsLog { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// RAN层日志配置
|
|||
/// </summary>
|
|||
public RanLayerLog RanLog { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 初始化所有网络层的日志级别
|
|||
/// </summary>
|
|||
/// <param name="isNonStandaloneMode">是否为非独立组网模式(NSA模式)</param>
|
|||
public void InitializeAllLogLevels(bool isNonStandaloneMode = false) |
|||
{ |
|||
ImsLog = new ImsLayerLog(); |
|||
RanLog = new RanLayerLog(); |
|||
|
|||
ImsLog.InitializeLogLevels(); |
|||
RanLog.InitializeLogLevels(isNonStandaloneMode); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 更新指定网络层和指定层的日志级别
|
|||
/// </summary>
|
|||
/// <param name="networkType">网络类型("IMS" 或 "RAN")</param>
|
|||
/// <param name="layerName">层名称</param>
|
|||
/// <param name="logLevel">日志级别</param>
|
|||
/// <returns>是否更新成功</returns>
|
|||
public bool UpdateLogLevel(string networkType, string layerName, LogLevel logLevel) |
|||
{ |
|||
if (string.IsNullOrEmpty(networkType) || string.IsNullOrEmpty(layerName)) |
|||
return false; |
|||
|
|||
return networkType.ToUpper() switch |
|||
{ |
|||
"IMS" => ImsLog?.UpdateLogLevel(layerName, logLevel) ?? false, |
|||
"RAN" => RanLog?.UpdateLogLevel(layerName, logLevel) ?? false, |
|||
_ => false |
|||
}; |
|||
} |
|||
} |
@ -0,0 +1,97 @@ |
|||
using Newtonsoft.Json; |
|||
using Newtonsoft.Json.Linq; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace CoreAgent.Domain.Models.Protocol; |
|||
|
|||
/// <summary>
|
|||
/// 协议日志实体类
|
|||
/// </summary>
|
|||
public record class ProtocolLog |
|||
{ |
|||
/// <summary>
|
|||
/// 消息ID
|
|||
/// </summary>
|
|||
[JsonProperty("message_id")] |
|||
public int? MessageId { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 消息头信息
|
|||
/// </summary>
|
|||
[JsonProperty("headers")] |
|||
public string[]? Headers { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 消息内容
|
|||
/// </summary>
|
|||
[JsonProperty("message")] |
|||
public string Message { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 消息类型
|
|||
/// </summary>
|
|||
[JsonProperty("type")] |
|||
public string? Type { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 消息名称
|
|||
/// </summary>
|
|||
[JsonProperty("name")] |
|||
public string? Name { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 协议版本
|
|||
/// </summary>
|
|||
[JsonProperty("version")] |
|||
public string? Version { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 时间戳
|
|||
/// </summary>
|
|||
[JsonProperty("time")] |
|||
public double? Time { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// UTC时间戳
|
|||
/// </summary>
|
|||
[JsonProperty("utc")] |
|||
public double? Utc { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 日志明细
|
|||
/// </summary>
|
|||
[JsonProperty("logs")] |
|||
public JToken? Logs { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 初始化协议日志实体类的新实例
|
|||
/// </summary>
|
|||
/// <param name="message">消息内容</param>
|
|||
/// <param name="type">消息类型</param>
|
|||
/// <param name="version">协议版本</param>
|
|||
/// <param name="time">时间戳</param>
|
|||
/// <param name="utc">UTC时间戳</param>
|
|||
/// <param name="logs">日志明细</param>
|
|||
/// <param name="messageId">消息ID</param>
|
|||
/// <param name="headers">消息头信息</param>
|
|||
public ProtocolLog( |
|||
string message, |
|||
string? type, |
|||
string? version, |
|||
double? time, |
|||
double? utc, |
|||
JToken? logs, |
|||
int? messageId, |
|||
string[]? headers) |
|||
{ |
|||
Message = message; |
|||
Type = type; |
|||
Version = version; |
|||
Name = type; |
|||
Utc = utc; |
|||
Time = time; |
|||
Logs = logs; |
|||
MessageId = messageId ?? 0; |
|||
Headers = headers; |
|||
} |
|||
} |
@ -0,0 +1,76 @@ |
|||
using System.Collections.Generic; |
|||
using Newtonsoft.Json; |
|||
|
|||
namespace CoreAgent.Domain.Models.Protocol; |
|||
|
|||
/// <summary>
|
|||
/// 协议日志明细
|
|||
/// </summary>
|
|||
public class ProtocolLogDetail |
|||
{ |
|||
/// <summary>
|
|||
/// 源信息
|
|||
/// </summary>
|
|||
[JsonProperty("src")] |
|||
public string Src { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 索引
|
|||
/// </summary>
|
|||
[JsonProperty("idx")] |
|||
public int Idx { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 日志级别
|
|||
/// </summary>
|
|||
[JsonProperty("level")] |
|||
public int Level { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 方向
|
|||
/// </summary>
|
|||
[JsonProperty("dir")] |
|||
public string Dir { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 时间戳
|
|||
/// </summary>
|
|||
[JsonProperty("timestamp")] |
|||
public long Timestamp { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 小区信息
|
|||
/// </summary>
|
|||
[JsonProperty("cell")] |
|||
public int? Cell { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 数据列表
|
|||
/// </summary>
|
|||
[JsonProperty("data")] |
|||
public List<string> Data { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 层信息
|
|||
/// </summary>
|
|||
[JsonProperty("layer")] |
|||
public string Layer { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// UE标识
|
|||
/// </summary>
|
|||
[JsonProperty("ue_id")] |
|||
public int? UeId { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 帧信息
|
|||
/// </summary>
|
|||
[JsonProperty("frame")] |
|||
public int? Frame { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 时隙信息
|
|||
/// </summary>
|
|||
[JsonProperty("slot")] |
|||
public int? Slot { get; set; } |
|||
} |
@ -0,0 +1,136 @@ |
|||
using System; |
|||
using System.Text.Json.Serialization; |
|||
|
|||
namespace CoreAgent.Domain.Models.Protocol; |
|||
|
|||
/// <summary>
|
|||
/// RAN层日志实体类
|
|||
/// 该实体用于记录RAN(无线接入网)相关的各层日志信息
|
|||
/// 遵循DDD(领域驱动设计)原则,作为领域模型的一部分
|
|||
/// </summary>
|
|||
public class RanLayerLog |
|||
{ |
|||
/// <summary>
|
|||
/// GTP-U协议层日志级别
|
|||
/// </summary>
|
|||
public string GTPU { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// LPPa协议层日志级别
|
|||
/// </summary>
|
|||
public string LPPa { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// M2AP协议层日志级别
|
|||
/// </summary>
|
|||
public string M2AP { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// MAC协议层日志级别
|
|||
/// </summary>
|
|||
public string MAC { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// NAS协议层日志级别
|
|||
/// </summary>
|
|||
public string NAS { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// NGAP协议层日志级别
|
|||
/// </summary>
|
|||
public string NGAP { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// NRPPa协议层日志级别
|
|||
/// </summary>
|
|||
public string NRPPa { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// PDCP协议层日志级别
|
|||
/// </summary>
|
|||
public string PDCP { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// PHY协议层日志级别
|
|||
/// </summary>
|
|||
public string PHY { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// RLC协议层日志级别
|
|||
/// </summary>
|
|||
public string RLC { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// RRC协议层日志级别
|
|||
/// </summary>
|
|||
public string RRC { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// S1AP协议层日志级别
|
|||
/// </summary>
|
|||
public string S1AP { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// TRX协议层日志级别
|
|||
/// </summary>
|
|||
public string TRX { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// X2AP协议层日志级别
|
|||
/// </summary>
|
|||
public string X2AP { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// XnAP协议层日志级别
|
|||
/// </summary>
|
|||
public string XnAP { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// PROD协议层日志级别
|
|||
/// </summary>
|
|||
[JsonIgnore] |
|||
public string PROD { get; set; } |
|||
|
|||
/// <summary>
|
|||
/// 初始化RAN层日志级别
|
|||
/// </summary>
|
|||
/// <param name="isNonStandaloneMode">是否为非独立组网模式(NSA模式)</param>
|
|||
public void InitializeLogLevels(bool isNonStandaloneMode = false) |
|||
{ |
|||
GTPU = LogLevel.Warn.ToString().ToLower(); |
|||
LPPa = LogLevel.Warn.ToString().ToLower(); |
|||
M2AP = LogLevel.Warn.ToString().ToLower(); |
|||
MAC = LogLevel.Warn.ToString().ToLower(); |
|||
NAS = LogLevel.Warn.ToString().ToLower(); |
|||
NGAP = LogLevel.Warn.ToString().ToLower(); |
|||
NRPPa = LogLevel.Warn.ToString().ToLower(); |
|||
PDCP = LogLevel.Warn.ToString().ToLower(); |
|||
PHY = LogLevel.Warn.ToString().ToLower(); |
|||
RLC = LogLevel.Warn.ToString().ToLower(); |
|||
RRC = LogLevel.Warn.ToString().ToLower(); |
|||
S1AP = LogLevel.Warn.ToString().ToLower(); |
|||
TRX = LogLevel.Warn.ToString().ToLower(); |
|||
X2AP = LogLevel.Warn.ToString().ToLower(); |
|||
XnAP = LogLevel.Warn.ToString().ToLower(); |
|||
PROD = LogLevel.Warn.ToString().ToLower(); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 更新指定层的日志级别
|
|||
/// </summary>
|
|||
/// <param name="layerName">层名称</param>
|
|||
/// <param name="logLevel">日志级别</param>
|
|||
/// <returns>是否更新成功</returns>
|
|||
public bool UpdateLogLevel(string layerName, LogLevel logLevel) |
|||
{ |
|||
if (string.IsNullOrEmpty(layerName)) |
|||
return false; |
|||
|
|||
var property = GetType().GetProperty(layerName); |
|||
if (property == null) |
|||
return false; |
|||
|
|||
property.SetValue(this, logLevel.ToString().ToLower()); |
|||
return true; |
|||
} |
|||
} |
@ -0,0 +1,172 @@ |
|||
using CoreAgent.Domain.Helpers; |
|||
using CoreAgent.Domain.Interfaces.CustomWSClient; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
using WebSocket4Net; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.CustomWSClient |
|||
{ |
|||
public class CustomWebSocketClient : IDisposable |
|||
{ |
|||
private readonly ILogger logger; |
|||
private readonly string serverUrl; |
|||
private WebSocket? webSocket; |
|||
protected readonly ICustomMessageHandler messageHandler; |
|||
private bool _disposed; |
|||
|
|||
protected CustomWebSocketClient(ILogger logger, string serverUrl, ICustomMessageHandler messageHandler) |
|||
{ |
|||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|||
this.serverUrl = $"ws://{serverUrl}"; |
|||
this.messageHandler = messageHandler ?? throw new ArgumentNullException(nameof(messageHandler)); |
|||
} |
|||
|
|||
public void Start() |
|||
{ |
|||
try |
|||
{ |
|||
logger.LogInformation("正在启动WebSocket客户端,连接到服务器: {ServerUrl}", serverUrl); |
|||
webSocket = new WebSocket4Net.WebSocket(serverUrl); |
|||
ConfigureWebSocketHandlers(); |
|||
webSocket.EnableAutoSendPing = false; |
|||
webSocket.Open(); |
|||
logger.LogInformation("WebSocket客户端启动成功"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "WebSocket客户端启动失败: {Message}", ex.Message); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
private void ConfigureWebSocketHandlers() |
|||
{ |
|||
try |
|||
{ |
|||
webSocket!.Opened += (s, e) => HandleOpen(e); |
|||
webSocket.Closed += (s, e) => HandleClosed(e); |
|||
webSocket.Error += (s, e) => HandleError(e); |
|||
webSocket.MessageReceived += (s, e) => HandleMessage(e); |
|||
webSocket.DataReceived += (s, e) => HandleDataReceivedMessage(e); |
|||
logger.LogDebug("WebSocket事件处理器配置完成"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "配置WebSocket事件处理器时发生错误: {Message}", ex.Message); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
private void HandleOpen(EventArgs @event) |
|||
{ |
|||
try |
|||
{ |
|||
logger.LogInformation("WebSocket已连接到服务器: {ServerUrl}", serverUrl); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "处理WebSocket打开事件时发生错误: {Message}", ex.Message); |
|||
} |
|||
} |
|||
|
|||
private void HandleClosed(EventArgs @event) |
|||
{ |
|||
try |
|||
{ |
|||
logger.LogInformation("WebSocket连接已关闭,状态: {State}", webSocket?.State); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "处理WebSocket关闭事件时发生错误: {Message}", ex.Message); |
|||
} |
|||
} |
|||
|
|||
private void HandleError(SuperSocket.ClientEngine.ErrorEventArgs ex) |
|||
{ |
|||
try |
|||
{ |
|||
logger.LogError(ex.Exception, "WebSocket发生错误: {Message}", ex.Exception.Message); |
|||
} |
|||
catch (Exception e) |
|||
{ |
|||
logger.LogError(e, "处理WebSocket错误事件时发生异常: {Message}", e.Message); |
|||
} |
|||
} |
|||
|
|||
private void HandleMessage(MessageReceivedEventArgs e) |
|||
{ |
|||
try |
|||
{ |
|||
if (webSocket is { State: WebSocketState.Connecting or WebSocketState.Open }) |
|||
{ |
|||
logger.LogDebug("收到WebSocket消息: {Message}", e.Message); |
|||
messageHandler.HandleMessage(e.Message, new ObserverCustomWebSocketClient(webSocket, logger)); |
|||
} |
|||
else |
|||
{ |
|||
logger.LogWarning("收到消息时WebSocket状态异常: {State}, 消息内容: {Message}", webSocket!.State, e.Message); |
|||
} |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "处理WebSocket消息时发生错误: {Message}, 原始消息: {OriginalMessage}", ex.Message, e.Message); |
|||
} |
|||
} |
|||
|
|||
private void HandleDataReceivedMessage(DataReceivedEventArgs data) |
|||
{ |
|||
try |
|||
{ |
|||
string message = Encoding.UTF8.GetString(data.Data); |
|||
logger.LogDebug("收到WebSocket二进制数据: {Message}", message); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "处理WebSocket二进制数据时发生错误: {Message}, 数据: {Data}", ex.Message, data.ObjToJson()); |
|||
} |
|||
} |
|||
|
|||
public void Stop() |
|||
{ |
|||
try |
|||
{ |
|||
logger.LogInformation("正在停止WebSocket客户端"); |
|||
webSocket?.Close(); |
|||
webSocket?.Dispose(); |
|||
logger.LogInformation("WebSocket客户端已停止"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "停止WebSocket客户端时发生错误: {Message}", ex.Message); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
Dispose(true); |
|||
GC.SuppressFinalize(this); |
|||
} |
|||
|
|||
protected virtual void Dispose(bool disposing) |
|||
{ |
|||
if (!_disposed) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
Stop(); |
|||
} |
|||
_disposed = true; |
|||
} |
|||
} |
|||
|
|||
~CustomWebSocketClient() |
|||
{ |
|||
Dispose(false); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,143 @@ |
|||
using CoreAgent.Domain.Interfaces.CustomWSClient; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Net.WebSockets; |
|||
using System.Text; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
using WebSocket4Net; |
|||
using WebSocket = WebSocket4Net.WebSocket; |
|||
using WebSocketState = WebSocket4Net.WebSocketState; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.CustomWSClient |
|||
{ |
|||
public class ObserverCustomWebSocketClient : IObserverCustomWebSocketClient, IDisposable |
|||
{ |
|||
private readonly WebSocket _client; |
|||
private readonly ILogger _logger; |
|||
private readonly BlockingCollection<string> _sendQueue; |
|||
private readonly CancellationTokenSource _cancellationTokenSource; |
|||
private readonly Task _sendTask; |
|||
private bool _disposed; |
|||
private const int SEND_INTERVAL_MS = 100; // 发送间隔,可以根据需要调整
|
|||
|
|||
public ObserverCustomWebSocketClient(WebSocket client, ILogger logger) |
|||
{ |
|||
_client = client ?? throw new ArgumentNullException(nameof(client)); |
|||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|||
|
|||
_sendQueue = new BlockingCollection<string>(); |
|||
_cancellationTokenSource = new CancellationTokenSource(); |
|||
_sendTask = Task.Run(ProcessSendQueue); |
|||
|
|||
_logger.LogInformation("WebSocket消息发送队列已启动"); |
|||
} |
|||
|
|||
public void SendMessage(string message) |
|||
{ |
|||
if (string.IsNullOrEmpty(message)) |
|||
{ |
|||
_logger.LogWarning("尝试发送空消息"); |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
_logger.LogDebug("将消息加入发送队列: {Message}", message); |
|||
_sendQueue.Add(message); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "将消息加入发送队列时发生错误: {Message}", message); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
private async Task ProcessSendQueue() |
|||
{ |
|||
try |
|||
{ |
|||
_logger.LogInformation("开始处理WebSocket消息发送队列"); |
|||
foreach (var message in _sendQueue.GetConsumingEnumerable(_cancellationTokenSource.Token)) |
|||
{ |
|||
try |
|||
{ |
|||
await SendMessageInternalAsync(message); |
|||
await Task.Delay(SEND_INTERVAL_MS); // 添加发送间隔
|
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "发送WebSocket消息时发生错误: {Message}", message); |
|||
} |
|||
} |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
_logger.LogInformation("WebSocket消息发送队列处理已取消"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "WebSocket消息发送队列处理过程中发生错误"); |
|||
} |
|||
} |
|||
|
|||
private async Task SendMessageInternalAsync(string message) |
|||
{ |
|||
if (_client.State != WebSocketState.Open) |
|||
{ |
|||
_logger.LogWarning("WebSocket未处于打开状态,无法发送消息。当前状态: {State}, 消息内容: {Message}", |
|||
_client.State, message); |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
_logger.LogDebug("正在发送WebSocket消息: {Message}", message); |
|||
await Task.Run(() => _client.Send(message)); |
|||
_logger.LogDebug("WebSocket消息发送成功"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "发送WebSocket消息时发生错误: {Message}", message); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
Dispose(true); |
|||
GC.SuppressFinalize(this); |
|||
} |
|||
|
|||
protected virtual void Dispose(bool disposing) |
|||
{ |
|||
if (!_disposed) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
_cancellationTokenSource.Cancel(); |
|||
_sendQueue.CompleteAdding(); |
|||
try |
|||
{ |
|||
_sendTask.Wait(TimeSpan.FromSeconds(5)); |
|||
} |
|||
catch (AggregateException ex) |
|||
{ |
|||
_logger.LogError(ex, "等待消息发送队列处理完成时发生错误"); |
|||
} |
|||
_sendQueue.Dispose(); |
|||
_cancellationTokenSource.Dispose(); |
|||
} |
|||
_disposed = true; |
|||
} |
|||
} |
|||
|
|||
~ObserverCustomWebSocketClient() |
|||
{ |
|||
Dispose(false); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,264 @@ |
|||
using CoreAgent.Domain.Interfaces.CustomWSClient; |
|||
using CoreAgent.Domain.Interfaces.Network; |
|||
using CoreAgent.Domain.Models.Protocol; |
|||
using CoreAgent.Infrastructure.Contexts; |
|||
using Microsoft.Extensions.Logging; |
|||
using Newtonsoft.Json.Linq; |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
/// <summary>
|
|||
/// IMS协议消息处理器
|
|||
/// 负责处理IMS相关的WebSocket消息,包括用户更新、短信、邀请等功能
|
|||
/// </summary>
|
|||
public class IMSLogMessageHandler : ICustomMessageHandler, IDisposable |
|||
{ |
|||
private readonly ILogger _logger; |
|||
private int _messageId = 0; |
|||
private string _currentMessageId = string.Empty; |
|||
private readonly Action<string> _messageCallback; |
|||
private readonly ICellularNetworkContext _context; |
|||
private readonly BlockingCollection<(string MessageData, IObserverCustomWebSocketClient Observer)> _messageQueue; |
|||
private readonly CancellationTokenSource _cancellationTokenSource; |
|||
private readonly Task _processTask; |
|||
private bool _disposed; |
|||
|
|||
public IMSLogMessageHandler(ILogger logger, ICellularNetworkContext context, Action<string> action) |
|||
{ |
|||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|||
_messageCallback = action ?? throw new ArgumentNullException(nameof(action)); |
|||
_context = context ?? throw new ArgumentNullException(nameof(context)); |
|||
|
|||
_messageQueue = new BlockingCollection<(string, IObserverCustomWebSocketClient)>(); |
|||
_cancellationTokenSource = new CancellationTokenSource(); |
|||
_processTask = Task.Run(ProcessMessageQueue); |
|||
|
|||
_logger.LogInformation("IMS协议消息处理器初始化完成,消息队列已启动"); |
|||
} |
|||
|
|||
public void HandleMessage(string messageData, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
try |
|||
{ |
|||
_logger.LogDebug("将消息加入处理队列: {MessageData}", messageData); |
|||
_messageQueue.Add((messageData, observer)); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "将消息加入队列时发生错误: {MessageData}", messageData); |
|||
} |
|||
} |
|||
|
|||
private async Task ProcessMessageQueue() |
|||
{ |
|||
try |
|||
{ |
|||
_logger.LogInformation("开始处理IMS消息队列"); |
|||
foreach (var (messageData, observer) in _messageQueue.GetConsumingEnumerable(_cancellationTokenSource.Token)) |
|||
{ |
|||
try |
|||
{ |
|||
await ProcessMessageAsync(messageData, observer); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "处理队列中的消息时发生错误: {MessageData}", messageData); |
|||
} |
|||
} |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
_logger.LogInformation("IMS消息队列处理已取消"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "IMS消息队列处理过程中发生错误"); |
|||
} |
|||
} |
|||
|
|||
private async Task ProcessMessageAsync(string messageData, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
try |
|||
{ |
|||
_logger.LogDebug("开始处理IMS协议消息: {MessageData}", messageData); |
|||
var data = JObject.Parse(messageData); |
|||
string messageType = data["message"]!.ToString(); |
|||
_logger.LogInformation("收到IMS协议消息类型: {MessageType}", messageData); |
|||
await HandleMessageByTypeAsync(messageType, data, observer); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "处理IMS协议消息时发生错误: {MessageData}", messageData); |
|||
} |
|||
} |
|||
|
|||
private async Task HandleMessageByTypeAsync(string messageType, JObject data, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
_currentMessageId = _messageId.ToString(); |
|||
|
|||
switch (messageType) |
|||
{ |
|||
case "ready": |
|||
await HandleReadyMessageAsync(observer); |
|||
break; |
|||
case "config_get": |
|||
await HandleConfigGetMessageAsync(data, observer); |
|||
break; |
|||
case "config_set": |
|||
await HandleConfigSetMessageAsync(observer); |
|||
break; |
|||
case "log_get": |
|||
await HandleLogGetMessageAsync(data, observer); |
|||
break; |
|||
case "stats": |
|||
await HandleStatsMessageAsync(observer); |
|||
break; |
|||
default: |
|||
_logger.LogWarning("收到未知的IMS协议消息类型: {MessageType}", messageType); |
|||
await Task.Run(() => observer.SendMessage(LayerLogslevelSetting(false))); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
private async Task HandleReadyMessageAsync(IObserverCustomWebSocketClient observer) |
|||
{ |
|||
string readyResponse = CreateMessage("config_get"); |
|||
_logger.LogInformation("发送ready响应: {Response}", readyResponse); |
|||
await Task.Run(() => observer.SendMessage(readyResponse)); |
|||
} |
|||
|
|||
private async Task HandleConfigGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
if (_currentMessageId == data["message_id"]!.ToString()) |
|||
{ |
|||
_logger.LogInformation("处理config_get请求"); |
|||
var responseArray = new JArray |
|||
{ |
|||
CreateRegisterMessage("users_update"), |
|||
CreateRegisterMessage("sms"), |
|||
CreateRegisterMessage("invite"), |
|||
CreateStatsMessage(), |
|||
SettingBaseLayerLogslevel(JObject.Parse(data["logs"].ToString())) |
|||
}; |
|||
_logger.LogInformation("发送config_get响应: {Response}", responseArray.ToString()); |
|||
await Task.Run(() => observer.SendMessage(responseArray.ToString())); |
|||
} |
|||
else |
|||
{ |
|||
_logger.LogWarning("config_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}", |
|||
data["message_id"]!.ToString(), _currentMessageId); |
|||
} |
|||
} |
|||
|
|||
private async Task HandleConfigSetMessageAsync(IObserverCustomWebSocketClient observer) |
|||
{ |
|||
_messageId++; |
|||
string configResponse = LayerLogslevelSetting(true); |
|||
_logger.LogInformation("发送config_set响应: {Response}", configResponse); |
|||
await Task.Run(() => observer.SendMessage(configResponse)); |
|||
//_currentMessageId = _messageId.ToString();
|
|||
} |
|||
|
|||
private async Task HandleLogGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
if (JArray.FromObject(data["logs"]).Count > 0) |
|||
{ |
|||
_messageId++; |
|||
string logResponse = LayerLogslevelSetting(false); |
|||
_logger.LogInformation("发送log_get响应: {Response}", logResponse); |
|||
await Task.Run(() => observer.SendMessage(logResponse)); |
|||
_currentMessageId = _messageId.ToString(); |
|||
await Task.Run(() => _messageCallback.Invoke(data.ToString())); |
|||
} |
|||
} |
|||
|
|||
private async Task HandleStatsMessageAsync(IObserverCustomWebSocketClient observer) |
|||
{ |
|||
_messageId++; |
|||
string statsResponse = CreateStatsMessage(); |
|||
_logger.LogInformation("发送stats响应: {Response}", statsResponse); |
|||
await Task.Run(() => observer.SendMessage(statsResponse)); |
|||
} |
|||
|
|||
private string CreateMessage(string message) |
|||
{ |
|||
_messageId++; |
|||
return JObject.FromObject(new { message, message_id = _messageId }).ToString(); |
|||
} |
|||
|
|||
private JObject CreateRegisterMessage(string register) |
|||
{ |
|||
_messageId++; |
|||
return JObject.FromObject(new { message = "register", message_id = _messageId, register }); |
|||
} |
|||
|
|||
private string CreateStatsMessage() |
|||
{ |
|||
_messageId++; |
|||
return CreateMessage("stats"); |
|||
} |
|||
|
|||
private JObject SettingBaseLayerLogslevel(JObject keyValues, bool isCloseSystemInfo = false) |
|||
{ |
|||
_messageId++; |
|||
keyValues.Remove("rotate"); |
|||
keyValues.Remove("path"); |
|||
keyValues.Remove("count"); |
|||
keyValues["bcch"] = isCloseSystemInfo; |
|||
|
|||
return JObject.FromObject(new |
|||
{ |
|||
message = "config_set", |
|||
logs = keyValues, |
|||
message_id = _messageId |
|||
}); |
|||
} |
|||
|
|||
private string LayerLogslevelSetting(bool isHead = false) |
|||
{ |
|||
_messageId++; |
|||
BaseNetworkLog<ImsLayerLog> logtype = new BaseNetworkLog<ImsLayerLog> |
|||
{ |
|||
Timeout = isHead ? 0 : 1, |
|||
MinLogCount = 64, |
|||
MaxLogCount = 2048, |
|||
LayerConfig = _context.NetworkLogs.ImsLog, |
|||
Message = "log_get", |
|||
IncludeHeaders = isHead, |
|||
MessageId = _messageId |
|||
}; |
|||
return JObject.FromObject(logtype).ToString(); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
Dispose(true); |
|||
GC.SuppressFinalize(this); |
|||
} |
|||
|
|||
protected virtual void Dispose(bool disposing) |
|||
{ |
|||
if (!_disposed) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
_cancellationTokenSource.Cancel(); |
|||
_messageQueue.CompleteAdding(); |
|||
_processTask.Wait(); |
|||
_cancellationTokenSource.Dispose(); |
|||
_messageQueue.Dispose(); |
|||
} |
|||
_disposed = true; |
|||
} |
|||
} |
|||
|
|||
~IMSLogMessageHandler() |
|||
{ |
|||
Dispose(false); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,147 @@ |
|||
using CoreAgent.Domain.Interfaces.Network; |
|||
using CoreAgent.Domain.Interfaces.ProtocolLogHandlers; |
|||
using CoreAgent.Domain.Models.Network; |
|||
using CoreAgent.Infrastructure.Contexts; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
/// <summary>
|
|||
/// 协议日志提供者观察者
|
|||
/// 负责接收、缓存和转发协议日志数据
|
|||
/// 使用生产者-消费者模式处理日志数据流
|
|||
/// </summary>
|
|||
public class ProtocolLogsProviderObserver : IProtocolLogsProviderObserver |
|||
{ |
|||
/// <summary>
|
|||
/// 用于存储待处理的协议日志消息的阻塞队列
|
|||
/// </summary>
|
|||
private readonly static BlockingCollection<string> BlockingQueue = new BlockingCollection<string>(); |
|||
|
|||
/// <summary>
|
|||
/// 日志记录器
|
|||
/// </summary>
|
|||
private readonly ILogger logger; |
|||
|
|||
/// <summary>
|
|||
/// 网络上下文,用于检查网络状态
|
|||
/// </summary>
|
|||
private readonly CellularNetworkContext networkContext; |
|||
|
|||
|
|||
/// <summary>
|
|||
/// 初始化协议日志提供者观察者
|
|||
/// </summary>
|
|||
/// <param name="networkContext">网络上下文</param>
|
|||
/// <param name="logger">日志记录器</param>
|
|||
public ProtocolLogsProviderObserver(CellularNetworkContext networkContext, ILogger logger) |
|||
{ |
|||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|||
this.networkContext = networkContext ?? throw new ArgumentNullException(nameof(networkContext)); |
|||
|
|||
logger.LogInformation("ProtocolLogsProviderObserver 初始化完成"); |
|||
_ = Task.Run(Consumer); |
|||
logger.LogInformation("消费者任务已启动"); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 接收并处理新的协议日志数据
|
|||
/// </summary>
|
|||
/// <param name="msg">协议日志消息</param>
|
|||
public void OnData(string msg) |
|||
{ |
|||
if (string.IsNullOrEmpty(msg)) |
|||
{ |
|||
logger.LogWarning("收到空的协议日志消息"); |
|||
return; |
|||
} |
|||
|
|||
try |
|||
{ |
|||
logger.LogDebug("收到新的协议日志消息: {Message}", msg); |
|||
Producer(msg); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "处理协议日志消息时发生错误: {Message}", msg); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 生产者方法:将消息添加到阻塞队列中
|
|||
/// </summary>
|
|||
/// <param name="message">要添加的消息</param>
|
|||
private void Producer(string message) |
|||
{ |
|||
try |
|||
{ |
|||
var networkState = networkContext.GetNetworkState(); |
|||
if (networkState.CurrentStatus == NetworkStatus.Connected) |
|||
{ |
|||
logger.LogDebug("网络已连接,将消息添加到队列: {Message}", message); |
|||
BlockingQueue.Add(message); |
|||
} |
|||
else |
|||
{ |
|||
logger.LogWarning("网络未连接,丢弃消息: {Message}", message); |
|||
} |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "添加消息到队列时发生错误: {Message}", message); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 消费者方法:从队列中获取并处理消息
|
|||
/// </summary>
|
|||
public async Task Consumer() |
|||
{ |
|||
logger.LogInformation("消费者任务开始运行"); |
|||
|
|||
try |
|||
{ |
|||
while (networkContext.GetNetworkState().CurrentStatus == NetworkStatus.Connected) |
|||
{ |
|||
try |
|||
{ |
|||
if (BlockingQueue.TryTake(out var message, TimeSpan.FromMilliseconds(1000))) |
|||
{ |
|||
logger.LogDebug("从队列中获取到消息: {Message}", message); |
|||
// TODO: 实现消息发送逻辑
|
|||
// await client.SendAsync(message, true);
|
|||
} |
|||
else |
|||
{ |
|||
await Task.Delay(1000); |
|||
} |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "处理队列消息时发生错误"); |
|||
await Task.Delay(1000); // 发生错误时等待一段时间再继续
|
|||
} |
|||
} |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
logger.LogWarning("消费者任务被取消"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError(ex, "消费者任务发生未处理的错误"); |
|||
} |
|||
finally |
|||
{ |
|||
logger.LogInformation("消费者任务已结束"); |
|||
} |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,30 @@ |
|||
using CoreAgent.Domain.Interfaces.CustomWSClient; |
|||
using CoreAgent.Domain.Interfaces.ProtocolLogHandlers; |
|||
using CoreAgent.Infrastructure.Services.CustomWSClient; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
public class ProtocolMSHandleLogs : CustomWebSocketClient, IProtocollHandleLogs |
|||
{ |
|||
protected ProtocolMSHandleLogs(ILogger logger, string serverUrl, ICustomMessageHandler messageHandler) : base(logger, serverUrl, messageHandler) |
|||
{ |
|||
|
|||
} |
|||
|
|||
public void RunStart() |
|||
{ |
|||
|
|||
} |
|||
|
|||
public void RunStop() |
|||
{ |
|||
|
|||
} |
|||
} |
|||
} |
@ -0,0 +1,12 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
internal class ProtocolMSParesHandleLogs |
|||
{ |
|||
} |
|||
} |
@ -0,0 +1,41 @@ |
|||
using CoreAgent.Domain.Models.Protocol; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Text.RegularExpressions; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
public abstract class ProtocolParesHandleLogs |
|||
{ |
|||
#region Regex
|
|||
public readonly Regex _regExpPhy = new Regex(@"^([a-f0-9\-]+)\s+([a-f0-9\-]+)\s+([\d\.\-]+) (\w+): (.+)"); |
|||
public readonly Regex _regExpInfo1 = new Regex(@"^([\w\-]+): (.+)"); |
|||
public readonly Regex _regExpInfo2 = new Regex(@"^([\w]+) (.+)"); |
|||
public readonly Regex _regExpParams1 = new Regex(@"\s+"); |
|||
public readonly Regex _regExpParams2 = new Regex(@"=|:"); |
|||
public readonly Regex _regExpIP = new Regex(@"^(len=\d+)\s+(\S+)\s+(.*)"); |
|||
public readonly Regex _regExpIPsec = new Regex(@"^len=(\d+)\s+(.*)"); |
|||
public readonly Regex _regExpIKEV2 = new Regex(@"^(\S+)\s+(.*)"); |
|||
public readonly Regex _regExpSDULen = new Regex(@"SDU_len=(\d+)"); |
|||
public readonly Regex _regExpSIP = new Regex(@"^([:\.\[\]\da-f]+)\s+(\S+) (.+)"); |
|||
public readonly Regex _regExpMediaReq = new Regex(@"^(\S+) (.+)"); |
|||
public readonly Regex _regExpSignalRecord = new Regex(@"Link:\s([\w\d]+)@(\d+)"); |
|||
public readonly Regex _regExpCellID = new Regex(@"^([a-f0-9\-]+) (.+)"); |
|||
public readonly Regex _regExpRRC_UE_ID = new Regex(@"Changing UE_ID to 0x(\d+)"); |
|||
public readonly Regex _regExpRRC_TMSI = new Regex(@"(5G|m)-TMSI '([\dA-F]+)'H"); |
|||
public readonly Regex _regExpRRC_NEW_ID = new Regex(@"newUE-Identity (['\dA-FH]+)"); |
|||
public readonly Regex _regExpRRC_CRNTI = new Regex(@"c-RNTI '([\dA-F]+)'H"); |
|||
public readonly Regex _regExpNAS_TMSI = new Regex(@"m-TMSI = 0x([\da-f]+)"); |
|||
public readonly Regex _regExpNAS_5GTMSI = new Regex(@"5G-TMSI = 0x([\da-f]+)"); |
|||
public readonly Regex _regExpRRC_BC = new Regex(@"(EUTRA|MRDC|NR|NRDC) band combinations"); |
|||
public readonly Regex _regExpPDCCH = new Regex(@"^\s*(.+)=(\d+)$"); |
|||
public readonly Regex _regExpS1NGAP = new Regex(@"^([\da-f\-]+)\s+([\da-f\-]+) (([^\s]+) .+)"); |
|||
public readonly Regex _regExpECPRI = new Regex(@"len=(\d+)"); |
|||
public readonly Regex _regExpHexDump = new Regex(@"^[\da-f]+:(\s+[\da-f]{2}){1,16}\s+.{1,16}$"); |
|||
#endregion
|
|||
public abstract Task GetTryParesLogDataHandle(ProtocolLog model); |
|||
} |
|||
} |
@ -0,0 +1,30 @@ |
|||
using CoreAgent.Domain.Interfaces.CustomWSClient; |
|||
using CoreAgent.Domain.Interfaces.ProtocolLogHandlers; |
|||
using CoreAgent.Infrastructure.Services.CustomWSClient; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
public class ProtocolRanHandleLogs : CustomWebSocketClient, IProtocollHandleLogs |
|||
{ |
|||
protected ProtocolRanHandleLogs(ILogger logger, string serverUrl, ICustomMessageHandler messageHandler) : base(logger, serverUrl, messageHandler) |
|||
{ |
|||
|
|||
} |
|||
|
|||
public void RunStart() |
|||
{ |
|||
|
|||
} |
|||
|
|||
public void RunStop() |
|||
{ |
|||
|
|||
} |
|||
} |
|||
} |
@ -0,0 +1,284 @@ |
|||
using CoreAgent.Domain.Helpers; |
|||
using CoreAgent.Domain.Interfaces.ProtocolLogHandlers; |
|||
using CoreAgent.Domain.Models.Protocol; |
|||
using Microsoft.Extensions.Logging; |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Text.RegularExpressions; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
|
|||
|
|||
|
|||
public class ProtocolLogsParesRanHandle : ProtocolParesHandleLogs |
|||
{ |
|||
private readonly ILogger logger; |
|||
private readonly IProtocolLogsProviderObserver observer; |
|||
public ProtocolLogsParesRanHandle(IProtocolLogsProviderObserver observer, ILogger logger) |
|||
{ |
|||
this.observer = observer; |
|||
this.logger = logger; |
|||
} |
|||
|
|||
public override async Task GetTryParesLogDataHandle(ProtocolLog model) |
|||
{ |
|||
try |
|||
{ |
|||
if (model is { Logs: null }) |
|||
{ |
|||
logger.LogError($"logs is null =====>GetTryParesLogDataHandle Data {model?.ObjToJson()}"); |
|||
return; |
|||
} |
|||
var ParseJsonData = model.Logs.ToString().JsonToObj<ProtocolLogDetail[]>(); |
|||
//var parseResultData = RanLogParseHandle(ParseJsonData, (model.MessageId ?? 0));
|
|||
//if (parseResultData.Any())
|
|||
//{
|
|||
// var MsgData = new { data = parseResultData, MessageType = CSCIMessageType.ProtocolLogsRAN }.ObjToJson();
|
|||
// logger.LogInformation($"OnData:{MsgData}");
|
|||
// observer.OnData(MsgData);
|
|||
//}
|
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
logger.LogError($"GetTryParesLogDataHandle Data {model?.ObjToJson()}"); |
|||
logger.LogError(ex.ToString()); |
|||
} |
|||
} |
|||
|
|||
|
|||
//#region 分析
|
|||
//private List<CellularNetworkProtocolLogsEntity> RanLogParseHandle(ProtocolLogDetail[] data, int MessageID)
|
|||
//{
|
|||
// List<CellularNetworkProtocolLogsEntity> cellulars = new List<CellularNetworkProtocolLogsEntity>();
|
|||
// try
|
|||
// {
|
|||
// foreach (var msg in data)
|
|||
// {
|
|||
// ProtocolLogslayerType LayerType;
|
|||
// Enum.TryParse(msg.layer, out LayerType);
|
|||
// string info = string.Empty;
|
|||
// string TMIS = string.Empty;
|
|||
// string MessageInfo = msg.data[0];
|
|||
// if (LayerType == ProtocolLogslayerType.RRC)
|
|||
// {
|
|||
// var reg = _regExpRRC_UE_ID.Match(MessageInfo);
|
|||
// if (reg is { Success: true })
|
|||
// {
|
|||
// int ue_id = Convert.ToInt32(reg.Groups[1].ToString(), 16);
|
|||
|
|||
// }
|
|||
// reg = _regExpInfo1.Match(MessageInfo);
|
|||
// if (reg.Success)
|
|||
// {
|
|||
// info = reg.Groups[1].Value;
|
|||
// MessageInfo = reg.Groups[2].Value;
|
|||
// GetlogListRRC(msg, MessageInfo);
|
|||
|
|||
// }
|
|||
// reg = _regExpRRC_BC.Match(MessageInfo);
|
|||
// if (reg is { Success: true })
|
|||
// {
|
|||
// string result = string.Join("\n", msg.data);
|
|||
|
|||
// }
|
|||
// }
|
|||
// else if (LayerType == ProtocolLogslayerType.NAS)
|
|||
// {
|
|||
// var reg = _regExpInfo1.Match(MessageInfo);
|
|||
// if (reg is { Success: true })
|
|||
// {
|
|||
// info = reg.Groups[1].Value;
|
|||
// MessageInfo = reg.Groups[2].Value;
|
|||
// TMIS = GetlogListNAS(msg, MessageInfo);
|
|||
// if (info.Equals("BCCH_NR"))
|
|||
// {
|
|||
|
|||
// }
|
|||
|
|||
// }
|
|||
// }
|
|||
// else if (LayerType == ProtocolLogslayerType.PHY)
|
|||
// {
|
|||
// var reg = _regExpPhy.Match(MessageInfo);
|
|||
// if (reg is { Success: true })
|
|||
// {
|
|||
// int cell = Convert.ToInt32(reg.Groups[1].Value, 16);
|
|||
// int rnti = Convert.ToInt32(reg.Groups[2].Value, 16);
|
|||
// var channel = reg.Groups[4].Value;
|
|||
// string[] frames = reg.Groups[3].Value.Split('.');
|
|||
// int frame = Convert.ToInt32(frames[0]) - 0;
|
|||
// int slot = Convert.ToInt32(frames[1]) - 0;
|
|||
// }
|
|||
|
|||
// }
|
|||
// else if (LayerType == ProtocolLogslayerType.MAC)
|
|||
// {
|
|||
// var reg = _regExpCellID.Match(MessageInfo);
|
|||
// if (reg is { Success: true })
|
|||
// {
|
|||
// int cell = Convert.ToInt32(reg.Groups[1].Value, 16);
|
|||
// MessageInfo = reg.Groups[2].Value;
|
|||
// }
|
|||
// }
|
|||
// CellularNetworkProtocolLogsEntity entity = new CellularNetworkProtocolLogsEntity
|
|||
// {
|
|||
// MessageID = MessageID,
|
|||
// CellID = msg.cell,
|
|||
// Timestamp = msg.timestamp,
|
|||
// UEID = msg.ue_id,
|
|||
// Time = msg.timestamp.UnixTimeStamp13ToBeijingTime().TimeOfDay,
|
|||
// ProtocolLayer = LayerType,
|
|||
// Info = info,
|
|||
// TMSI = TMIS,
|
|||
// Message = MessageInfo,
|
|||
// ProtocolType = msg.src,
|
|||
// Index = msg.idx,
|
|||
// Direction = msg.dir switch
|
|||
// {
|
|||
// "UL" => DirectionLogsType.UL,
|
|||
// "DL" => DirectionLogsType.DL,
|
|||
// _ => DirectionLogsType._
|
|||
// },
|
|||
// MessageData = msg.data.ToArray(),
|
|||
// };
|
|||
// cellulars.Add(entity);
|
|||
// }
|
|||
// return cellulars;
|
|||
// }
|
|||
// catch (Exception ex)
|
|||
// {
|
|||
// logger.Error("ran logs analysis combination" + ex.Message);
|
|||
// return cellulars;
|
|||
// }
|
|||
//}
|
|||
|
|||
//private void GetlogListRRC(ProtocolNetWorkCoreLogsDetailEntity log, string logType)
|
|||
//{
|
|||
// try
|
|||
// {
|
|||
// switch (logType.ToLower())
|
|||
// {
|
|||
// case "sib1":
|
|||
// break;
|
|||
// case "sib":
|
|||
// break;
|
|||
// case "rrc connection request":
|
|||
// var lte = GetFindData(log.data, _regExpRRC_TMSI);
|
|||
// if (lte is { Success: true })
|
|||
// {
|
|||
// int tmsi = GetTMSI(log, lte.Groups[2].Value.ToString());
|
|||
// }
|
|||
// break;
|
|||
// case "rrc connection reconfiguration":
|
|||
// var connection = GetFindData(log.data, _regExpRRC_TMSI);
|
|||
// if (connection is { Success: true })
|
|||
// {
|
|||
// int rnti = GetRNTI(log, connection.Groups[1].Value);
|
|||
// }
|
|||
// break;
|
|||
// case "rrc connection reestablishment request":
|
|||
// var request = GetFindData(log.data, _regExpRRC_TMSI);
|
|||
// if (request is { Success: true })
|
|||
// {
|
|||
// int rnti = GetRNTI(log, request.Groups[1].Value);
|
|||
// }
|
|||
// break;
|
|||
// case "rrc setup request":
|
|||
// var RRCrequest = GetFindData(log.data, _regExpRRC_TMSI);
|
|||
// break;
|
|||
// case "ue capability information":
|
|||
|
|||
// break;
|
|||
// default:
|
|||
// break;
|
|||
// }
|
|||
// }
|
|||
// catch (Exception ex)
|
|||
// {
|
|||
// logger.Error("GetlogListRRC" + ex.Message);
|
|||
// }
|
|||
//}
|
|||
|
|||
//private string GetlogListNAS(ProtocolNetWorkCoreLogsDetailEntity log, string logType)
|
|||
//{
|
|||
// switch (logType.ToLower())
|
|||
// {
|
|||
// case "attach accept":
|
|||
// var lte = GetFindData(log.data, _regExpNAS_TMSI);
|
|||
// if (lte is not null && !string.IsNullOrWhiteSpace(lte.Groups[1]?.ToString()))
|
|||
// {
|
|||
|
|||
// int tmsi = GetTMSI(log, lte.Groups[1].Value);
|
|||
// return lte.Groups[1].Value;
|
|||
|
|||
// }
|
|||
// break;
|
|||
// case "attach request":
|
|||
// var data = log.data;
|
|||
// break;
|
|||
// case "registration accept":
|
|||
// var Nr = GetFindData(log.data, _regExpNAS_5GTMSI);
|
|||
// if (Nr is not null && !string.IsNullOrWhiteSpace(Nr?.Groups[1]?.ToString()))
|
|||
// {
|
|||
// int tmsi = GetTMSI(log, Nr.Groups[1].Value);
|
|||
// return Nr.Groups[1].Value;
|
|||
// }
|
|||
// break;
|
|||
// case "registration request":
|
|||
// var requestData = log.data;
|
|||
// break;
|
|||
|
|||
// }
|
|||
// return string.Empty;
|
|||
|
|||
//}
|
|||
|
|||
//private Match GetFindData(List<string> data, Regex regExp)
|
|||
//{
|
|||
// if (data.Any())
|
|||
// {
|
|||
// for (var i = 0; i < data.Count; i++)
|
|||
// {
|
|||
// Match m = regExp.Match(data[i]);
|
|||
// if (m.Success)
|
|||
// return m;
|
|||
// }
|
|||
// }
|
|||
// return default;
|
|||
//}
|
|||
|
|||
//private int GetTMSI(ProtocolNetWorkCoreLogsDetailEntity log, string tmsi)
|
|||
//{
|
|||
// return Convert.ToInt32(tmsi, 16);
|
|||
//}
|
|||
|
|||
//private int GetRNTI(ProtocolNetWorkCoreLogsDetailEntity log, string rnti)
|
|||
//{
|
|||
// // 定义正则表达式模式
|
|||
// string pattern = @"'([\dA-F]+)'H";
|
|||
|
|||
// // 创建正则表达式对象
|
|||
// Regex regex = new Regex(pattern);
|
|||
|
|||
// // 进行匹配
|
|||
// Match match = regex.Match(rnti);
|
|||
|
|||
// // 检查是否有匹配
|
|||
// if (match.Success)
|
|||
// return Convert.ToInt32(match.Groups[1].Value, 16);
|
|||
// else return 0;
|
|||
//}
|
|||
|
|||
//private bool GetCheckLayerTypeInfo(string layer)
|
|||
//{
|
|||
// string[] ArraryLayer = new string[] { "PDSCH", "PDCCH", "EPDCCH", "PUSCH", "PUCCH", "NPDSCH", "NPUSCH", "NPBCH", "BCCH-NR", "HINTS", "SRS", "CSI", "SIM-Even", "IP-SIM" };
|
|||
// return ArraryLayer.Any(s => s.Equals(layer));
|
|||
//}
|
|||
//#endregion
|
|||
} |
|||
|
|||
} |
@ -0,0 +1,368 @@ |
|||
using CoreAgent.Domain.Interfaces.CustomWSClient; |
|||
using CoreAgent.Domain.Interfaces.Network; |
|||
using CoreAgent.Domain.Models.Protocol; |
|||
using Microsoft.Extensions.Logging; |
|||
using Newtonsoft.Json.Linq; |
|||
using System; |
|||
using System.Collections.Concurrent; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Text; |
|||
using System.Threading; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers |
|||
{ |
|||
/// <summary>
|
|||
/// RAN协议WebSocket消息处理器
|
|||
/// 负责处理无线接入网(RAN)相关的WebSocket消息,包括配置获取、日志获取等功能
|
|||
/// </summary>
|
|||
public class RanLogMessageHandler : ICustomMessageHandler, IDisposable |
|||
{ |
|||
private readonly ILogger _logger; |
|||
private int _messageId = 0; |
|||
private string _currentMessageId = string.Empty; |
|||
private readonly Action<string> _messageCallback; |
|||
private readonly ICellularNetworkContext _networkContext; |
|||
private readonly BlockingCollection<(string MessageData, IObserverCustomWebSocketClient Observer)> _messageQueue; |
|||
private readonly CancellationTokenSource _cancellationTokenSource; |
|||
private readonly Task _processTask; |
|||
private bool _disposed; |
|||
|
|||
/// <summary>
|
|||
/// 初始化RAN协议消息处理器
|
|||
/// </summary>
|
|||
/// <param name="logger">日志记录器</param>
|
|||
/// <param name="context">蜂窝网络上下文</param>
|
|||
/// <param name="action">消息回调处理函数</param>
|
|||
public RanLogMessageHandler(ILogger logger, ICellularNetworkContext context, Action<string> action) |
|||
{ |
|||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); |
|||
_messageCallback = action ?? throw new ArgumentNullException(nameof(action)); |
|||
_networkContext = context ?? throw new ArgumentNullException(nameof(context)); |
|||
|
|||
_messageQueue = new BlockingCollection<(string, IObserverCustomWebSocketClient)>(); |
|||
_cancellationTokenSource = new CancellationTokenSource(); |
|||
_processTask = Task.Run(ProcessMessageQueue); |
|||
|
|||
_logger.LogInformation("RAN协议消息处理器初始化完成,消息队列已启动"); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理接收到的WebSocket消息
|
|||
/// 将消息加入处理队列,由队列处理器异步处理
|
|||
/// </summary>
|
|||
/// <param name="messageData">接收到的消息数据</param>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
public void HandleMessage(string messageData, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
try |
|||
{ |
|||
_logger.LogDebug("将消息加入处理队列: {MessageData}", messageData); |
|||
_messageQueue.Add((messageData, observer)); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "将消息加入队列时发生错误: {MessageData}", messageData); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理消息队列中的消息
|
|||
/// 持续从队列中获取消息并处理,直到队列关闭或取消
|
|||
/// </summary>
|
|||
private async Task ProcessMessageQueue() |
|||
{ |
|||
try |
|||
{ |
|||
_logger.LogInformation("开始处理消息队列"); |
|||
foreach (var (messageData, observer) in _messageQueue.GetConsumingEnumerable(_cancellationTokenSource.Token)) |
|||
{ |
|||
try |
|||
{ |
|||
await ProcessMessageAsync(messageData, observer); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "处理队列中的消息时发生错误: {MessageData}", messageData); |
|||
} |
|||
} |
|||
} |
|||
catch (OperationCanceledException) |
|||
{ |
|||
_logger.LogInformation("消息队列处理已取消"); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "消息队列处理过程中发生错误"); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理单条消息
|
|||
/// 解析消息类型并分发到对应的处理方法
|
|||
/// </summary>
|
|||
/// <param name="messageData">消息数据</param>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
private async Task ProcessMessageAsync(string messageData, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
try |
|||
{ |
|||
_logger.LogDebug("开始处理RAN协议消息: {MessageData}", messageData); |
|||
var jsonData = JObject.Parse(messageData); |
|||
string messageType = jsonData["message"]!.ToString(); |
|||
_logger.LogInformation("收到RAN协议消息类型: {MessageType}", messageType); |
|||
await ProcessMessageByTypeAsync(messageType, jsonData, observer); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_logger.LogError(ex, "处理RAN协议消息时发生错误: {MessageData}", messageData); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 根据消息类型分发到对应的处理方法
|
|||
/// </summary>
|
|||
/// <param name="messageType">消息类型</param>
|
|||
/// <param name="data">消息数据</param>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
private async Task ProcessMessageByTypeAsync(string messageType, JObject data, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
_logger.LogDebug("开始处理RAN协议消息类型: {MessageType}", messageType); |
|||
switch (messageType) |
|||
{ |
|||
case "ready": |
|||
await HandleReadyMessageAsync(observer); |
|||
break; |
|||
case "config_get": |
|||
await HandleConfigGetMessageAsync(data, observer); |
|||
break; |
|||
case "config_set": |
|||
await HandleConfigSetMessageAsync(observer); |
|||
break; |
|||
case "log_get": |
|||
await HandleLogGetMessageAsync(data, observer); |
|||
break; |
|||
case "stats": |
|||
await HandleStatsMessageAsync(observer); |
|||
break; |
|||
default: |
|||
_logger.LogWarning("收到未知的RAN协议消息类型: {MessageType}", messageType); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理ready消息
|
|||
/// 发送config_get请求,准备获取配置信息
|
|||
/// </summary>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
private async Task HandleReadyMessageAsync(IObserverCustomWebSocketClient observer) |
|||
{ |
|||
_messageId++; |
|||
string readyResponse = JObject.FromObject(new { message = "config_get", message_id = _messageId }).ToString(); |
|||
_logger.LogInformation("发送ready响应: {Response}", readyResponse); |
|||
await Task.Run(() => observer.SendMessage(readyResponse)); |
|||
_currentMessageId = _messageId.ToString(); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理config_get消息
|
|||
/// 发送统计信息和基础层日志配置
|
|||
/// </summary>
|
|||
/// <param name="data">消息数据</param>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
private async Task HandleConfigGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
if (_currentMessageId == data["message_id"]!.ToString()) |
|||
{ |
|||
_logger.LogInformation("处理config_get请求"); |
|||
var responseArray = new JArray(); |
|||
|
|||
_messageId++; |
|||
var statsConfig = new { message = "stats", message_id = _messageId, rf = false, samples = false }; |
|||
responseArray.Add(JObject.FromObject(statsConfig)); |
|||
|
|||
_messageId++; |
|||
string baseLayerConfig = ConfigureBaseLayerLogs(data); |
|||
responseArray.Add(JObject.Parse(baseLayerConfig)); |
|||
|
|||
_logger.LogInformation("发送config_get响应: {Response}", responseArray.ToString()); |
|||
await Task.Run(() => observer.SendMessage(responseArray.ToString())); |
|||
} |
|||
else |
|||
{ |
|||
_logger.LogWarning("config_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}", |
|||
data["message_id"]!.ToString(), _currentMessageId); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理config_set消息
|
|||
/// 发送层日志配置
|
|||
/// </summary>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
private async Task HandleConfigSetMessageAsync(IObserverCustomWebSocketClient observer) |
|||
{ |
|||
_messageId++; |
|||
string configResponse = ConfigureLayerLogs(true); |
|||
await Task.Run(() => observer.SendMessage(configResponse)); |
|||
_currentMessageId = _messageId.ToString(); |
|||
_logger.LogInformation("发送config_set响应: {Response}", configResponse); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理log_get消息
|
|||
/// 发送日志配置并触发回调
|
|||
/// </summary>
|
|||
/// <param name="data">消息数据</param>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
private async Task HandleLogGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer) |
|||
{ |
|||
if (_currentMessageId == data["message_id"]!.ToString()) |
|||
{ |
|||
_messageId++; |
|||
string logResponse = ConfigureLayerLogs(false); |
|||
await Task.Run(() => observer.SendMessage(logResponse)); |
|||
_currentMessageId = _messageId.ToString(); |
|||
_logger.LogInformation("发送log_get响应: {Response}", logResponse); |
|||
} |
|||
else |
|||
{ |
|||
_logger.LogWarning("log_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}", |
|||
data["message_id"]!.ToString(), _currentMessageId); |
|||
} |
|||
await Task.Run(() => _messageCallback.Invoke(data.ToString())); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 处理stats消息
|
|||
/// 发送统计信息请求
|
|||
/// </summary>
|
|||
/// <param name="observer">WebSocket客户端观察者</param>
|
|||
private async Task HandleStatsMessageAsync(IObserverCustomWebSocketClient observer) |
|||
{ |
|||
_messageId++; |
|||
string statsResponse = JObject.FromObject(new |
|||
{ |
|||
message = "stats", |
|||
message_id = _messageId, |
|||
rf = false, |
|||
samples = false |
|||
}).ToString(); |
|||
await Task.Run(() => observer.SendMessage(statsResponse)); |
|||
_logger.LogInformation("发送stats响应: {Response}", statsResponse); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 配置基础层日志
|
|||
/// 设置各种日志级别的开关状态
|
|||
/// </summary>
|
|||
/// <param name="keyValues">配置键值对</param>
|
|||
/// <param name="isCloseSystemInfo">是否关闭系统信息</param>
|
|||
/// <returns>配置后的JSON字符串</returns>
|
|||
private string ConfigureBaseLayerLogs(JObject keyValues, bool isCloseSystemInfo = false) |
|||
{ |
|||
_logger.LogDebug("开始配置基础层日志: {KeyValues}", keyValues.ToString()); |
|||
|
|||
// 移除不需要的配置项
|
|||
if (keyValues.Remove("rotate") && keyValues.Remove("path") && keyValues.Remove("count")) |
|||
{ |
|||
_logger.LogDebug("已移除rotate、path和count配置项"); |
|||
} |
|||
|
|||
// 设置系统信息相关配置
|
|||
keyValues["bcch"] = isCloseSystemInfo; |
|||
keyValues["cch"] = isCloseSystemInfo; |
|||
keyValues["cell_meas"] = isCloseSystemInfo; |
|||
keyValues["csi"] = isCloseSystemInfo; |
|||
|
|||
// 设置其他配置项
|
|||
keyValues["dci_size"] = false; |
|||
keyValues["mib"] = false; |
|||
keyValues["rep"] = false; |
|||
keyValues["signal"] = false; |
|||
|
|||
var configMessage = new |
|||
{ |
|||
message = "config_set", |
|||
logs = keyValues, |
|||
message_id = _messageId, |
|||
}; |
|||
|
|||
string response = JObject.FromObject(configMessage).ToString(); |
|||
_logger.LogInformation("基础层日志配置完成: {Response}", response); |
|||
return response; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 配置层日志
|
|||
/// 设置日志超时、计数等参数
|
|||
/// </summary>
|
|||
/// <param name="includeHeaders">是否包含头部信息</param>
|
|||
/// <returns>配置后的JSON字符串</returns>
|
|||
private string ConfigureLayerLogs(bool includeHeaders = false) |
|||
{ |
|||
_logger.LogDebug("开始配置层日志: IncludeHeaders={IncludeHeaders}", includeHeaders); |
|||
|
|||
var logConfig = new BaseNetworkLog<RanLayerLog> |
|||
{ |
|||
Timeout = includeHeaders ? 0 : 1, |
|||
MinLogCount = 64, |
|||
MaxLogCount = 2048, |
|||
LayerConfig = _networkContext.NetworkLogs.RanLog, |
|||
Message = "log_get", |
|||
IncludeHeaders = includeHeaders, |
|||
MessageId = _messageId, |
|||
}; |
|||
|
|||
string response = JObject.FromObject(logConfig).ToString(); |
|||
_logger.LogInformation("层日志配置完成: {Response}", response); |
|||
return response; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 释放资源
|
|||
/// </summary>
|
|||
public void Dispose() |
|||
{ |
|||
Dispose(true); |
|||
GC.SuppressFinalize(this); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 释放资源的具体实现
|
|||
/// </summary>
|
|||
/// <param name="disposing">是否正在释放托管资源</param>
|
|||
protected virtual void Dispose(bool disposing) |
|||
{ |
|||
if (!_disposed) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
_cancellationTokenSource.Cancel(); |
|||
_messageQueue.CompleteAdding(); |
|||
try |
|||
{ |
|||
_processTask.Wait(TimeSpan.FromSeconds(5)); |
|||
} |
|||
catch (AggregateException ex) |
|||
{ |
|||
_logger.LogError(ex, "等待消息队列处理完成时发生错误"); |
|||
} |
|||
_messageQueue.Dispose(); |
|||
_cancellationTokenSource.Dispose(); |
|||
} |
|||
_disposed = true; |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// 析构函数
|
|||
/// </summary>
|
|||
~RanLogMessageHandler() |
|||
{ |
|||
Dispose(false); |
|||
} |
|||
} |
|||
} |
Loading…
Reference in new issue