using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using CoreAgent.ProtocolClient.Context; using CoreAgent.ProtocolClient.Managers.WebSocketMgr; using CoreAgent.ProtocolClient.Models; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using Newtonsoft.Json; namespace CoreAgent.ProtocolClient.ProtocolEngineCore { /// /// 日志消息处理器 /// 负责处理WebSocket消息和日志获取相关的逻辑 /// public class LogMessageHandler { private readonly ProtocolClientConfig _config; private readonly ProtocolClientContext _context; private readonly ILogger _logger; private readonly WebSocketMessageManager _messageManager; public LogMessageHandler(ProtocolClientConfig config, ProtocolClientContext context, ILogger logger, WebSocketMessageManager messageManager) { _config = config; _context = context; _logger = logger; _messageManager = messageManager; } /// /// 处理接收到的WebSocket消息 /// /// 接收到的消息 public void HandleMessage(JObject message) { var messageType = message["message"]?.ToString(); if (messageType == "log_get") { ParseLogGetResponse(message); } } /// /// 发起日志获取请求 /// /// 请求参数 public void GetLogs(Dictionary? parameters = null) { var layers = new JObject(); if (_config.Logs?.Layers != null) { foreach (var layerKvp in _config.Logs.Layers) { layers[layerKvp.Key] = layerKvp.Value.Filter; } } var message = new JObject { ["timeout"] = 1, ["min"] = 64, ["max"] = 2048, ["layers"] = layers, ["message"] = "log_get", ["headers"] = _context.FeatureFlags.IsFirstGetLogs }; if (_context.FeatureFlags.IsFirstGetLogs) { _context.FeatureFlags.IsFirstGetLogs = false; } if (parameters != null) { foreach (var param in parameters) { message[param.Key] = JToken.FromObject(param.Value); } } var oldLogGetId = _messageManager.MessageIdManager.CurrentLogGetMessageId; var newLogGetId = _messageManager.SendLogGetMessage(message, ParseLogGetResponse); _logger.LogDebug($"[{_config.Name}] LogGet ID change: {oldLogGetId} -> {newLogGetId}, Params: {JsonConvert.SerializeObject(parameters)}"); } /// /// 解析日志获取响应 /// /// 响应消息 private void ParseLogGetResponse(JObject msg) { var currentLogGetId = _messageManager.MessageIdManager.CurrentLogGetMessageId; var json = msg.ToObject(); if (json != null) { // 触发日志处理事件 OnLogReceived?.Invoke(json); } if (msg["message_id"]?.Value() == currentLogGetId) { string data = $"[{_config.Name}] Received last LogGet response for message_id={currentLogGetId}, fetching next batch."; _logger.LogDebug(data); GetLogs(); // Continue the log polling } else { _logger.LogDebug($"[{_config.Name}] Received LogGet response for message_id={msg["message_id"]?.Value()}, but current LogGet ID is {currentLogGetId}. Ignoring."); } } /// /// 重置日志 /// public void ResetLogs() { if (_context.State == Enums.ClientState.Connected) { _messageManager.SendMessage(new JObject { ["message"] = "log_reset" }, response => { _context.LogContext.ResetLogs(); }); } else { _context.LogContext.ResetLogs(); } } /// /// 设置日志配置 /// /// 日志配置 /// 是否保存配置 public void SetLogsConfig(ProtocolClientLogsConfig logsConfig, bool save = false) { var logsPayload = new JObject(); var layersPayload = new JObject(); foreach (var layerKvp in logsConfig.Layers) { layersPayload[layerKvp.Key] = JObject.FromObject(new { level = layerKvp.Value.Level, maxSize = layerKvp.Value.MaxSize, payload = layerKvp.Value.Payload }); } logsPayload["layers"] = layersPayload; if (logsConfig.Signal.HasValue) logsPayload["signal"] = logsConfig.Signal.Value; if (logsConfig.Cch.HasValue) logsPayload["cch"] = logsConfig.Cch.Value; _messageManager.SendMessage(new JObject { ["message"] = "config_set", ["logs"] = logsPayload }, response => { if (save) { _config.Logs = logsConfig; } GetLogs(new Dictionary { ["timeout"] = 0 }); }, true); } /// /// 日志接收事件 /// public event Action? OnLogReceived; } }