using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using WebSocket4Net; using LTEMvcApp.Models; using System.Text.RegularExpressions; using Microsoft.Extensions.Logging; using System.Text.Json; namespace LTEMvcApp.Services { /// /// LTE客户端WebSocket实现 - 对应JavaScript中的lte.client.server /// public class LTEClientWebSocket : IDisposable { #region 私有字段 private WebSocket? _webSocket; private readonly LTEClient _client; private readonly ClientConfig _config; private readonly ConcurrentDictionary _messageHandlers; private readonly ConcurrentDictionary _messageHandlersByName; private readonly ConcurrentQueue _messageFifo; private readonly CancellationTokenSource _cancellationTokenSource; private Timer? _reconnectTimer; private Timer? _statsTimer; private Timer? _messageDeferTimer; private Timer? _readyTimer; private int _messageId; private int _logGetId; private bool _disposed; private readonly ILogger _logger; private const int ServerMessageCacheLimit = 1000; private readonly ConcurrentQueue _sentMessages = new ConcurrentQueue(); private readonly ConcurrentQueue _receivedMessages = new ConcurrentQueue(); // 统计更新相关字段 private int _statsPollDelay = 1000; // 默认1秒,对应JavaScript版本的_statsPolldelay private bool _isFirstStatsUpdate = true; // 防止重复调用的标志 private bool _isSocketReady = false; #endregion #region 事件 /// /// 连接打开事件 /// public event EventHandler? ConnectionOpened; /// /// 连接关闭事件 /// public event EventHandler? ConnectionClosed; /// /// 连接错误事件 /// public event EventHandler? ConnectionError; /// /// 消息接收事件 /// public event EventHandler? MessageReceived; /// /// 日志接收事件 /// public event EventHandler>? LogsReceived; /// /// 统计信息接收事件 /// public event EventHandler? StatsReceived; /// /// 状态改变事件 /// public event EventHandler? StateChanged; /// /// 客户端网格刷新事件 /// public event EventHandler? ClientGridRefreshed; #endregion #region 属性 /// /// 客户端 /// public LTEClient Client => _client; /// /// 配置 /// public ClientConfig Config => _config; /// /// 是否已连接 /// public bool IsConnected => _webSocket?.State == WebSocketState.Open; /// /// 当前状态 /// public ClientState State => _client.State; /// /// 是否暂停 /// public bool IsPaused => _config.Pause; /// /// 是否只读 /// public bool IsReadonly => _config.Readonly; public IEnumerable SentMessages => _sentMessages; public IEnumerable ReceivedMessages => _receivedMessages; public int SentMessagesCount => _sentMessages.Count; public int ReceivedMessagesCount => _receivedMessages.Count; #endregion #region 构造函数 /// /// 构造函数 /// /// 客户端配置 /// ILogger实例 public LTEClientWebSocket(ClientConfig config, ILogger logger) { _config = config; _client = new LTEClient(config,new LogsManager()); _messageHandlers = new ConcurrentDictionary(); _messageHandlersByName = new ConcurrentDictionary(); _messageFifo = new ConcurrentQueue(); _cancellationTokenSource = new CancellationTokenSource(); _messageId = 0; _logger = logger; _logger.LogInformation($"创建WebSocket客户端: {config.Name}"); } #endregion #region 公共方法 /// /// 启动WebSocket连接 /// public void Start() { if (_disposed) return; try { _logger.LogInformation($"[{_config.Name}] 尝试连接: {_config.Address}"); SetState(ClientState.Connecting); // 构建WebSocket URL var address = _config.Address ?? "192.168.13.12:9001"; var url = (_config.Ssl ? "wss://" : "ws://") + address; _webSocket = new WebSocket(url); _webSocket.EnableAutoSendPing = false; // 绑定事件处理器 _webSocket.Opened += OnSocketOpened; _webSocket.Closed += OnSocketClosed; _webSocket.MessageReceived += OnSocketMessage0; _webSocket.Error += OnSocketError; _webSocket.Open(); } catch (Exception ex) { _logger.LogError(ex, $"[{_config.Name}] 连接异常: {ex.Message}"); ConnectionError?.Invoke(this, $"无法连接到 {_config.Address}: {ex.Message}"); SetState(ClientState.Error); } } /// /// 停止WebSocket连接 /// public void Stop() { SetState(ClientState.Stop); StopTimers(); _isSocketReady = false; // 重置连接状态标志 } /// /// 重置日志 /// public void ResetLogs() { if (State == ClientState.Connected) { SendMessage(new JObject { ["message"] = "log_reset" }, response => { _client.ResetLogs(); }); } else { _client.ResetLogs(); } } /// /// 切换播放/暂停状态 /// public void PlayPause() { if (_config.Pause) { _config.Pause = false; if (State == ClientState.Connected) { LogGet(); } } else { _config.Pause = true; } } /// /// 设置日志配置 /// public void SetLogsConfig(ClientLogsConfig logsConfig, bool save = false) { var logsPayload = new JObject(); var layersPayload = new JObject(); foreach (var layerKvp in logsConfig.Layers) { layersPayload[layerKvp.Key] = JObject.FromObject(new { level = layerKvp.Value.Level, maxSize = layerKvp.Value.MaxSize, payload = layerKvp.Value.Payload, //filter = layerKvp.Value.Filter }); } logsPayload["layers"] = layersPayload; if (logsConfig.Signal.HasValue) logsPayload["signal"] = logsConfig.Signal.Value; if (logsConfig.Cch.HasValue) logsPayload["cch"] = logsConfig.Cch.Value; SendMessage(new JObject { ["message"] = "config_set", ["logs"] = logsPayload }, response => { if (save) { _config.Logs = logsConfig; } LogGet(new Dictionary { ["timeout"] = 0 }); }, true); } /// /// 设置消息处理器 /// /// 消息名称 /// 处理器 public void SetMessageHandler(string[] names, MessageHandler handler) { SendMessage(new JObject { ["message"] = "register", ["register"] = string.Join(",", names) }); foreach (var name in names) { _messageHandlersByName[name] = handler; } } /// /// 取消消息处理器 /// /// 消息名称 public void UnsetMessageHandler(string[] names) { SendMessage(new JObject { ["message"] = "register", ["unregister"] = string.Join(",", names) }); foreach (var name in names) { _messageHandlersByName.TryRemove(name, out _); } } /// /// 发送消息 /// /// 消息 /// 回调 /// 错误处理器 /// 消息ID public int SendMessage(JObject message, Action? callback = null, bool errorHandler = false) { if (_webSocket?.State != WebSocketState.Open) return -1; if (message == null) return -1; var id = ++_messageId; message["message_id"] = id; if (callback != null) { _messageHandlers[id] = new MessageHandler { Callback = callback, ErrorHandler = errorHandler }; } if (_messageDeferTimer != null) { _messageDeferTimer.Dispose(); _messageDeferTimer = null; } _messageFifo.Enqueue(message); //// 记录发送的消息 //_sentMessages.Enqueue(message.ToString(Formatting.Indented)); //while (_sentMessages.Count > ServerMessageCacheLimit) //{ // _sentMessages.TryDequeue(out _); //} if (_messageFifo.Count < 100) // 批处理大小 { _messageDeferTimer = new Timer(_ => SendMessageNow(), null, 1, Timeout.Infinite); } else { SendMessageNow(); } return id; } /// /// 获取日志 /// /// 参数 public void LogGet(Dictionary? parameters = null) { if (_config.Pause) return; var layers = new JObject(); if (_config.Logs?.Layers != null) { foreach (var layerKvp in _config.Logs.Layers) { layers[layerKvp.Key] = layerKvp.Value.Filter; } } var message = new JObject { ["timeout"] = 1, ["min"] = 64, ["max"] = 2048, ["layers"] = layers, ["message"] = "log_get", ["headers"] = _client.LogCount == 0 }; if (parameters != null) { foreach (var param in parameters) { message[param.Key] = JToken.FromObject(param.Value); } } _logGetId = SendMessage(message, LogGetParse); } /// /// 手动触发统计更新 /// public void TriggerStatsUpdate() { UpdateStats(); } /// /// 重置统计信息 /// public void ResetStatistics() { ResetStats(); } /// /// 获取当前统计更新间隔 /// /// 统计更新间隔(毫秒) public int GetStatsUpdateInterval() { return _statsPollDelay; } #endregion #region 私有方法 /// /// WebSocket连接打开事件 /// private void OnSocketOpened(object? sender, EventArgs e) { _logger.LogInformation($"[{_config.Name}] WebSocket连接已打开"); StopTimers(); _readyTimer = new Timer(_ => OnSocketReady(), null, 2500, Timeout.Infinite); ConnectionOpened?.Invoke(this, EventArgs.Empty); } /// /// WebSocket连接关闭事件 /// private void OnSocketClosed(object? sender, EventArgs e) { _logger.LogWarning($"[{_config.Name}] WebSocket连接已关闭"); ConnectionClosed?.Invoke(this, EventArgs.Empty); StopTimers(); CloseComponents(); // 重置连接状态标志 _isSocketReady = false; if (State == ClientState.Connected) { // 处理监控窗口停止 } if (_config.Enabled) { Console.WriteLine("启动重连定时器"); if (State != ClientState.Stop) { SetState(ClientState.Error); } _reconnectTimer = new Timer(_ => Start(), null, _config.ReconnectDelay, Timeout.Infinite); } } /// /// WebSocket错误事件 /// private void OnSocketError(object? sender, SuperSocket.ClientEngine.ErrorEventArgs e) { _logger.LogError(e.Exception, $"[{_config.Name}] WebSocket错误: {e.Exception.Message}"); SetState(ClientState.Error); ConnectionError?.Invoke(this, e.Exception.Message); } /// /// 初始消息处理(对应_onSocketMessage0) /// private void OnSocketMessage0(object? sender, MessageReceivedEventArgs e) { _logger.LogDebug($"[{_config.Name}] 收到初始消息: {e.Message}"); StopTimers(); // 记录接收的消息 _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented)); while (_receivedMessages.Count > ServerMessageCacheLimit) { _receivedMessages.TryDequeue(out _); } try { var data = e.Message; var msg = JObject.Parse(data); switch (msg["message"]?.ToString()) { case "authenticate": if (msg["ready"]?.Value() == true) { OnSocketReady(); } else if (msg["error"] == null && !string.IsNullOrEmpty(_config.Password)) { // 重新认证 Authenticate(_config.Password, msg["challenge"]?.ToString() ?? ""); } else { // 提示输入密码 PromptPassword(msg); } break; case "ready": OnSocketReady(); break; default: break; } } catch (Exception ex) { ConnectionError?.Invoke(this, $"JSON解析错误: {ex.Message}"); } } /// /// WebSocket准备就绪(对应_onSocketReady) /// private void OnSocketReady() { if (_webSocket == null || _isSocketReady) { _logger.LogDebug($"[{_config.Name}] OnSocketReady被跳过: WebSocket={_webSocket != null}, IsSocketReady={_isSocketReady}"); return; // 防止重复调用 } _logger.LogInformation($"[{_config.Name}] WebSocket准备就绪,开始初始化"); _isSocketReady = true; // 设置标志 // 切换到正常的消息处理函数 _webSocket.MessageReceived -= OnSocketMessage0; _webSocket.MessageReceived += OnSocketMessage; _messageFifo.Clear(); // 检查当前配置 var firstCon = _config.Logs.Layers.Count == 0; // 获取配置 _logger.LogDebug($"[{_config.Name}] 发送config_get请求"); SendMessage(new JObject { ["message"] = "config_get" }, config => { Console.WriteLine("配置已接收"); _logger.LogInformation($"[{_config.Name}] 配置已接收"); _client.ResetLogs(); // 设置基本信息 _client.Version = config["version"]?.ToString(); _client.Name = config["name"]?.ToString() ?? _config.Name; _client.Model = config["type"]?.ToString(); if (config["profiling"]?.Value() == true) { // 设置性能分析可用标志 } var ro = _config.Readonly; var serverLogsConfig = config["logs"]?.ToObject(Newtonsoft.Json.JsonSerializer.CreateDefault(new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore })); if (ro || firstCon) { if (serverLogsConfig != null) { _config.Logs = serverLogsConfig; } } // 添加小区信息 if (config["cells"] != null) { var cells = config["cells"]?.ToObject>(); if (cells != null) { foreach (var cell in cells) { // 添加小区配置 _client.AddCell(Convert.ToInt32(cell.Key), cells); } } } SetState(ClientState.Connected); if (firstCon && !_config.SkipLogMenu) { // 显示配置窗口 Console.WriteLine("请配置日志"); } else if (ro) { LogGet(new Dictionary { ["timeout"] = 0 }); } else { SetLogsConfig(_config.Logs); } // 启动统计更新(对应JavaScript版本的_updateStats(true)) UpdateStats(true); }); } /// /// 正常消息处理(对应_onSocketMessage) /// private void OnSocketMessage(object? sender, MessageReceivedEventArgs e) { _logger.LogDebug($"[{_config.Name}] 收到消息: {e.Message}"); // 记录接收的消息 _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented)); while (_receivedMessages.Count > ServerMessageCacheLimit) { _receivedMessages.TryDequeue(out _); } try { var data = e.Message; JObject msg; if (data.StartsWith("[")) { // 处理二进制数据 var array = JArray.Parse(data); msg = array[0] as JObject ?? new JObject(); // 处理二进制数据部分 } else { msg = JObject.Parse(data); } MessageReceived?.Invoke(this, msg); // 检查消息处理器 var id = msg["message_id"]?.Value(); if (id.HasValue && _messageHandlers.TryGetValue(id.Value, out var handler)) { if (msg["notification"]?.Value() != true) { _messageHandlers.TryRemove(id.Value, out _); } if (msg["error"] != null) { if (!handler.ErrorHandler) { ConnectionError?.Invoke(this, msg["error"]?.ToString() ?? "未知错误"); } else { handler.Callback?.Invoke(msg); } return; } handler.Callback?.Invoke(msg); return; } // 检查按名称的消息处理器 var name = msg["message"]?.ToString(); if (!string.IsNullOrEmpty(name) && _messageHandlersByName.TryGetValue(name, out var nameHandler)) { nameHandler.Callback?.Invoke(msg); return; } // 处理特定消息类型 switch (name) { case "log_get": LogGetParse(msg); break; case "stats": StatsReceived?.Invoke(this, msg); break; default: Console.WriteLine($"未知消息: {name}"); break; } } catch (Exception ex) { ConnectionError?.Invoke(this, $"消息处理错误: {ex.Message}"); } } /// /// 日志获取解析 /// private void LogGetParse(JObject msg) { _logger.LogInformation($"[{_config.Name}] 解析日志消息: {msg}"); var count = _client.LogCount; if (msg["headers"] != null) { var headers = msg["headers"]?.ToObject(); if (headers != null) { _client.SetHeaders(headers); } } // 初始化模型猜测 _client.LogModelGuessInit(); var logs = msg["logs"]; if (logs != null) { Console.WriteLine($"接收到日志: {logs.Count()} 条"); if (logs is JArray logsArray) { // 模型猜测 _client.LogModelGuess(logsArray.ToObject>>() ?? new List>()); var logList = new List(); foreach (var logItem in logsArray) { if (logItem is JObject logObj) { try { // 使用JsonConvert反序列化创建LTELog对象 var log = JsonConvert.DeserializeObject(logObj.ToString()); if (log == null) continue; // 处理消息和数据 - 从data中提取第一条作为Message(与JavaScript版本保持一致) if (log.Data is List dataList && dataList.Count > 0) { log.Message = dataList[0]; dataList.RemoveAt(0); } // 设置方向(与JavaScript版本保持一致) log.Direction = _client.DirConvert(log); // 处理信息字段(与JavaScript版本保持一致) if (log.Info != null) { log.Info = _client.StringToId(log.Info.ToString()); } // 处理PHY层的信号记录(与JavaScript版本保持一致) if (log.Layer == "PHY" && log.Data is List data) { var signalRecord = new Dictionary(); for (int j = data.Count - 1; j >= 0; j--) { var line = data[j]; var match = Regex.Match(line, @"Link:\s([\w\d]+)@(\d+)"); if (match.Success) { var linkName = match.Groups[1].Value; var offset = uint.Parse(match.Groups[2].Value); signalRecord[linkName] = new { offset = offset }; data.RemoveAt(j); } } if (signalRecord.Count > 0) { log.SignalRecord = signalRecord; // 修复:正确设置信号记录 _client.HasSignalRecord = true; } } log.Client = _client; logList.Add(log); } catch (Exception ex) { _logger.LogError(ex, $"[{_config.Name}] 解析日志项时出错: {ex.Message}"); // 继续处理下一个日志项,不中断整个流程 } } } // 调用日志解析(与JavaScript版本保持一致) if (logList.Count > 0) { _client.ParseLogList(logList, true); // 更新日志管理器(对应JavaScript版本的lteLogs.updateLogs()) _client.LogsManager.UpdateLogs(); LogsReceived?.Invoke(this, logList); } } } if (count == 0 && _client.LogCount > 0) { // 刷新客户端网格 ClientGridRefreshed?.Invoke(this, EventArgs.Empty); } // 更新日志获取 - 只有在最后一个请求时才更新 if (msg["message_id"]?.Value() == _logGetId) { LogGet(); } } /// /// 发送消息 /// private void SendMessageNow() { if (_webSocket?.State != WebSocketState.Open) return; var messages = new List(); while (_messageFifo.TryDequeue(out var message)) { messages.Add(message); } if (messages.Count == 1) { var json = JsonConvert.SerializeObject(messages[0]); _webSocket.Send(json); // 记录发送的消息 var formattedJson = JToken.Parse(json).ToString(Formatting.Indented); _sentMessages.Enqueue(formattedJson); while (_sentMessages.Count > ServerMessageCacheLimit) { _sentMessages.TryDequeue(out _); } } else if (messages.Count > 1) { var json = JsonConvert.SerializeObject(messages); _webSocket.Send(json); // 记录发送的消息 var formattedJson = JToken.Parse(json).ToString(Formatting.Indented); _sentMessages.Enqueue(formattedJson); while (_sentMessages.Count > ServerMessageCacheLimit) { _sentMessages.TryDequeue(out _); } } _messageDeferTimer?.Dispose(); _messageDeferTimer = null; } /// /// 停止定时器 /// private void StopTimers() { _reconnectTimer?.Dispose(); _reconnectTimer = null; StopStatsTimer(); // 使用专门的统计定时器停止方法 ResetStats(); // 重置统计信息 _messageDeferTimer?.Dispose(); _messageDeferTimer = null; _readyTimer?.Dispose(); _readyTimer = null; } /// /// 关闭组件 /// private void CloseComponents() { // 关闭所有组件 } /// /// 设置状态 /// private void SetState(ClientState state) { if (_client.State != state) { _client.State = state; StateChanged?.Invoke(this, state); } } /// /// 获取默认过滤器 /// private string GetDefaultFilter(string layer) { return layer switch { "NAS" or "RRC" => "debug", "EVENT" or "ALARM" or "MON" or "PROD" => "info", _ => "warn" }; } /// /// 检查是否为日志参数 /// private bool IsLogParameter(string parameter) { var logParams = new[] { "signal", "cch", "bcch", "mib", "rep", "csi", "dci_size", "cell_meas" }; return logParams.Contains(parameter); } /// /// 认证 /// private void Authenticate(string password, string challenge) { // 实现认证逻辑 var authMessage = new JObject { ["message"] = "authenticate", ["password"] = password }; SendMessage(authMessage); } /// /// 提示输入密码 /// private void PromptPassword(JObject msg) { // 实现密码提示逻辑 Console.WriteLine("需要认证,请输入密码"); } #region 统计更新相关方法 /// /// 设置刷新延迟时间 /// /// 延迟时间(毫秒) public void SetRefreshDelay(int delay) { _statsPollDelay = delay; } /// /// 获取刷新延迟时间 /// /// 延迟时间(毫秒) public int GetRefreshDelay() { return _statsPollDelay; } /// /// 更新统计信息(对应JavaScript版本的_updateStats) /// /// 是否为第一次更新 private void UpdateStats(bool first = false) { if (_disposed || _webSocket?.State != WebSocketState.Open) return; var msg = new JObject { ["message"] = "stats" }; // 准备统计消息(对应JavaScript版本的tab.statsPrepare) PrepareStatsMessage(msg); SendMessage(msg, response => { if (_disposed) return; // 第一次调用会重置统计 if (first || _isFirstStatsUpdate) { ResetStats(); _isFirstStatsUpdate = false; // 第一次调用后启动定时器循环 StartStatsTimer(); return; } // 发送统计事件(对应JavaScript版本的sendEvent) StatsReceived?.Invoke(this, response); // 添加统计到统计面板(对应JavaScript版本的lteStatsTab.add) AddStatsToPanel(response); // 设置下一次统计更新(对应JavaScript版本的定时器循环) StartStatsTimer(); }); } /// /// 准备统计消息 /// /// 统计消息 private void PrepareStatsMessage(JObject msg) { // 这里可以添加统计消息的准备工作 // 对应JavaScript版本中的tab.statsPrepare(msg) _logger.LogDebug($"[{_config.Name}] 准备统计消息: {msg}"); } /// /// 重置统计信息(对应JavaScript版本的_resetStats) /// private void ResetStats() { // 重置统计信息,对应JavaScript版本的lteStatsTab.add(this.getName(), {cpu: {global: 0}}) var resetStats = new JObject { ["cpu"] = new JObject { ["global"] = 0 } }; AddStatsToPanel(resetStats); _logger.LogDebug($"[{_config.Name}] 重置统计信息"); } /// /// 添加统计到面板 /// /// 统计信息 private void AddStatsToPanel(JObject stats) { // 这里应该将统计信息添加到统计面板 // 对应JavaScript版本中的lteStatsTab.add(this.getName(), resp) _logger.LogDebug($"[{_config.Name}] 添加统计到面板: {stats}"); // 可以在这里实现具体的统计面板更新逻辑 // 或者通过事件通知其他组件更新统计信息 } /// /// 启动统计更新定时器 /// private void StartStatsTimer() { if (_statsTimer != null) { _statsTimer.Dispose(); } _statsTimer = new Timer(_ => UpdateStats(), null, _statsPollDelay, Timeout.Infinite); } /// /// 停止统计更新定时器 /// private void StopStatsTimer() { _statsTimer?.Dispose(); _statsTimer = null; } #endregion #endregion #region IDisposable public void Dispose() { if (_disposed) return; _disposed = true; Stop(); StopTimers(); _cancellationTokenSource.Cancel(); _cancellationTokenSource.Dispose(); _webSocket?.Dispose(); _logger.LogInformation($"[{_config.Name}] 释放WebSocket客户端"); } #endregion } /// /// 消息处理器 /// public class MessageHandler { public Action? Callback { get; set; } public bool ErrorHandler { get; set; } } }