You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
5.9 KiB

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using CoreAgent.ProtocolClient.Context;
using CoreAgent.ProtocolClient.Managers.WebSocketMgr;
using CoreAgent.ProtocolClient.Models;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
namespace CoreAgent.ProtocolClient.ProtocolEngineCore
{
/// <summary>
/// 日志消息处理器
/// 负责处理WebSocket消息和日志获取相关的逻辑
/// </summary>
public class LogMessageHandler
{
private readonly ProtocolClientConfig _config;
private readonly ProtocolClientContext _context;
private readonly ILogger _logger;
private readonly WebSocketMessageManager _messageManager;
public LogMessageHandler(ProtocolClientConfig config, ProtocolClientContext context, ILogger logger, WebSocketMessageManager messageManager)
{
_config = config;
_context = context;
_logger = logger;
_messageManager = messageManager;
}
/// <summary>
/// 处理接收到的WebSocket消息
/// </summary>
/// <param name="message">接收到的消息</param>
public void HandleMessage(JObject message)
{
var messageType = message["message"]?.ToString();
if (messageType == "log_get")
{
ParseLogGetResponse(message);
}
}
/// <summary>
/// 发起日志获取请求
/// </summary>
/// <param name="parameters">请求参数</param>
public void GetLogs(Dictionary<string, object>? parameters = null)
{
var layers = new JObject();
if (_config.Logs?.Layers != null)
{
foreach (var layerKvp in _config.Logs.Layers)
{
layers[layerKvp.Key] = layerKvp.Value.Filter;
}
}
var message = new JObject
{
["timeout"] = 1,
["min"] = 64,
["max"] = 2048,
["layers"] = layers,
["message"] = "log_get",
["headers"] = _context.FeatureFlags.IsFirstGetLogs
};
if (_context.FeatureFlags.IsFirstGetLogs)
{
_context.FeatureFlags.IsFirstGetLogs = false;
}
if (parameters != null)
{
foreach (var param in parameters)
{
message[param.Key] = JToken.FromObject(param.Value);
}
}
var oldLogGetId = _messageManager.MessageIdManager.CurrentLogGetMessageId;
var newLogGetId = _messageManager.SendLogGetMessage(message, ParseLogGetResponse);
_logger.LogDebug($"[{_config.Name}] LogGet ID change: {oldLogGetId} -> {newLogGetId}, Params: {JsonConvert.SerializeObject(parameters)}");
}
/// <summary>
/// 解析日志获取响应
/// </summary>
/// <param name="msg">响应消息</param>
private void ParseLogGetResponse(JObject msg)
{
var currentLogGetId = _messageManager.MessageIdManager.CurrentLogGetMessageId;
var json = msg.ToObject<SourceProtocolLog>();
if (json != null)
{
// 触发日志处理事件
OnLogReceived?.Invoke(json);
}
if (msg["message_id"]?.Value<int>() == currentLogGetId)
{
string data = $"[{_config.Name}] Received last LogGet response for message_id={currentLogGetId}, fetching next batch.";
_logger.LogDebug(data);
GetLogs(); // Continue the log polling
}
else
{
_logger.LogDebug($"[{_config.Name}] Received LogGet response for message_id={msg["message_id"]?.Value<int>()}, but current LogGet ID is {currentLogGetId}. Ignoring.");
}
}
/// <summary>
/// 重置日志
/// </summary>
public void ResetLogs()
{
if (_context.State == Enums.ClientState.Connected)
{
_messageManager.SendMessage(new JObject { ["message"] = "log_reset" }, response =>
{
_context.LogContext.ResetLogs();
});
}
else
{
_context.LogContext.ResetLogs();
}
}
/// <summary>
/// 设置日志配置
/// </summary>
/// <param name="logsConfig">日志配置</param>
/// <param name="save">是否保存配置</param>
public void SetLogsConfig(ProtocolClientLogsConfig logsConfig, bool save = false)
{
var logsPayload = new JObject();
var layersPayload = new JObject();
foreach (var layerKvp in logsConfig.Layers)
{
layersPayload[layerKvp.Key] = JObject.FromObject(new
{
level = layerKvp.Value.Level,
maxSize = layerKvp.Value.MaxSize,
payload = layerKvp.Value.Payload
});
}
logsPayload["layers"] = layersPayload;
if (logsConfig.Signal.HasValue) logsPayload["signal"] = logsConfig.Signal.Value;
if (logsConfig.Cch.HasValue) logsPayload["cch"] = logsConfig.Cch.Value;
_messageManager.SendMessage(new JObject { ["message"] = "config_set", ["logs"] = logsPayload }, response =>
{
if (save)
{
_config.Logs = logsConfig;
}
GetLogs(new Dictionary<string, object> { ["timeout"] = 0 });
}, true);
}
/// <summary>
/// 日志接收事件
/// </summary>
public event Action<SourceProtocolLog>? OnLogReceived;
}
}