using CoreAgent.Domain.Interfaces.CustomWSClient;
using CoreAgent.Domain.Interfaces.Network;
using CoreAgent.Domain.Models.Protocol;
using CoreAgent.Infrastructure.Contexts;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace CoreAgent.Infrastructure.Services.ProtocolLogHandlers
{
///
/// IMS协议消息处理器
/// 负责处理IMS相关的WebSocket消息,包括用户更新、短信、邀请等功能
///
public class IMSLogMessageHandler : ICustomMessageHandler, IDisposable
{
private readonly ILogger _logger;
private int _messageId = 0;
private string _currentMessageId = string.Empty;
private readonly Action _messageCallback;
private readonly ICellularNetworkContext _context;
private readonly BlockingCollection<(string MessageData, IObserverCustomWebSocketClient Observer)> _messageQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _processTask;
private bool _disposed;
public IMSLogMessageHandler(ILogger logger, ICellularNetworkContext context, Action action)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_messageCallback = action ?? throw new ArgumentNullException(nameof(action));
_context = context ?? throw new ArgumentNullException(nameof(context));
_messageQueue = new BlockingCollection<(string, IObserverCustomWebSocketClient)>();
_cancellationTokenSource = new CancellationTokenSource();
_processTask = Task.Run(ProcessMessageQueue);
_logger.LogInformation("IMS协议消息处理器初始化完成,消息队列已启动");
}
public void HandleMessage(string messageData, IObserverCustomWebSocketClient observer)
{
try
{
_logger.LogDebug("将消息加入处理队列: {MessageData}", messageData);
_messageQueue.Add((messageData, observer));
}
catch (Exception ex)
{
_logger.LogError(ex, "将消息加入队列时发生错误: {MessageData}", messageData);
}
}
private async Task ProcessMessageQueue()
{
try
{
_logger.LogInformation("开始处理IMS消息队列");
foreach (var (messageData, observer) in _messageQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
{
try
{
await ProcessMessageAsync(messageData, observer);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理队列中的消息时发生错误: {MessageData}", messageData);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("IMS消息队列处理已取消");
}
catch (Exception ex)
{
_logger.LogError(ex, "IMS消息队列处理过程中发生错误");
}
}
private async Task ProcessMessageAsync(string messageData, IObserverCustomWebSocketClient observer)
{
try
{
_logger.LogDebug("开始处理IMS协议消息: {MessageData}", messageData);
var data = JObject.Parse(messageData);
string messageType = data["message"]!.ToString();
_logger.LogInformation("收到IMS协议消息类型: {MessageType}", messageData);
await HandleMessageByTypeAsync(messageType, data, observer);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理IMS协议消息时发生错误: {MessageData}", messageData);
}
}
private async Task HandleMessageByTypeAsync(string messageType, JObject data, IObserverCustomWebSocketClient observer)
{
_currentMessageId = _messageId.ToString();
switch (messageType)
{
case "ready":
await HandleReadyMessageAsync(observer);
break;
case "config_get":
await HandleConfigGetMessageAsync(data, observer);
break;
case "config_set":
await HandleConfigSetMessageAsync(observer);
break;
case "log_get":
await HandleLogGetMessageAsync(data, observer);
break;
case "stats":
await HandleStatsMessageAsync(observer);
break;
default:
_logger.LogWarning("收到未知的IMS协议消息类型: {MessageType}", messageType);
await Task.Run(() => observer.SendMessage(LayerLogslevelSetting(false)));
break;
}
}
private async Task HandleReadyMessageAsync(IObserverCustomWebSocketClient observer)
{
string readyResponse = CreateMessage("config_get");
_logger.LogInformation("发送ready响应: {Response}", readyResponse);
await Task.Run(() => observer.SendMessage(readyResponse));
}
private async Task HandleConfigGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer)
{
if (_currentMessageId == data["message_id"]!.ToString())
{
_logger.LogInformation("处理config_get请求");
var responseArray = new JArray
{
CreateRegisterMessage("users_update"),
CreateRegisterMessage("sms"),
CreateRegisterMessage("invite"),
CreateStatsMessage(),
SettingBaseLayerLogslevel(JObject.Parse(data["logs"].ToString()))
};
_logger.LogInformation("发送config_get响应: {Response}", responseArray.ToString());
await Task.Run(() => observer.SendMessage(responseArray.ToString()));
}
else
{
_logger.LogWarning("config_get消息ID不匹配: 收到={ReceivedId}, 期望={ExpectedId}",
data["message_id"]!.ToString(), _currentMessageId);
}
}
private async Task HandleConfigSetMessageAsync(IObserverCustomWebSocketClient observer)
{
_messageId++;
string configResponse = LayerLogslevelSetting(true);
_logger.LogInformation("发送config_set响应: {Response}", configResponse);
await Task.Run(() => observer.SendMessage(configResponse));
//_currentMessageId = _messageId.ToString();
}
private async Task HandleLogGetMessageAsync(JObject data, IObserverCustomWebSocketClient observer)
{
if (JArray.FromObject(data["logs"]).Count > 0)
{
_messageId++;
string logResponse = LayerLogslevelSetting(false);
_logger.LogInformation("发送log_get响应: {Response}", logResponse);
await Task.Run(() => observer.SendMessage(logResponse));
_currentMessageId = _messageId.ToString();
await Task.Run(() => _messageCallback.Invoke(data.ToString()));
}
}
private async Task HandleStatsMessageAsync(IObserverCustomWebSocketClient observer)
{
_messageId++;
string statsResponse = CreateStatsMessage();
_logger.LogInformation("发送stats响应: {Response}", statsResponse);
await Task.Run(() => observer.SendMessage(statsResponse));
}
private string CreateMessage(string message)
{
_messageId++;
return JObject.FromObject(new { message, message_id = _messageId }).ToString();
}
private JObject CreateRegisterMessage(string register)
{
_messageId++;
return JObject.FromObject(new { message = "register", message_id = _messageId, register });
}
private string CreateStatsMessage()
{
_messageId++;
return CreateMessage("stats");
}
private JObject SettingBaseLayerLogslevel(JObject keyValues, bool isCloseSystemInfo = false)
{
_messageId++;
keyValues.Remove("rotate");
keyValues.Remove("path");
keyValues.Remove("count");
keyValues["bcch"] = isCloseSystemInfo;
return JObject.FromObject(new
{
message = "config_set",
logs = keyValues,
message_id = _messageId
});
}
private string LayerLogslevelSetting(bool isHead = false)
{
_messageId++;
BaseNetworkLog logtype = new BaseNetworkLog
{
Timeout = isHead ? 0 : 1,
MinLogCount = 64,
MaxLogCount = 2048,
LayerConfig = _context.NetworkLogs.ImsLog,
Message = "log_get",
IncludeHeaders = isHead,
MessageId = _messageId
};
return JObject.FromObject(logtype).ToString();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_cancellationTokenSource.Cancel();
_messageQueue.CompleteAdding();
_processTask.Wait();
_cancellationTokenSource.Dispose();
_messageQueue.Dispose();
}
_disposed = true;
}
}
~IMSLogMessageHandler()
{
Dispose(false);
}
}
}