diff --git a/LTEMvcApp/Controllers/WebSocketController.cs b/LTEMvcApp/Controllers/WebSocketController.cs index 5f24034..1a4986a 100644 --- a/LTEMvcApp/Controllers/WebSocketController.cs +++ b/LTEMvcApp/Controllers/WebSocketController.cs @@ -3,6 +3,7 @@ using LTEMvcApp.Models; using LTEMvcApp.Services; using Newtonsoft.Json.Linq; using Microsoft.Extensions.Logging; +using System.Threading.Tasks; namespace LTEMvcApp.Controllers { @@ -244,41 +245,73 @@ namespace LTEMvcApp.Controllers } /// - /// 获取客户端消息队列 + /// 使用 Server-Sent Events (SSE) 实时推送客户端消息 /// /// 客户端名称 - /// 发送消息起始索引 - /// 接收消息起始索引 - /// 发送和接收的消息队列 - [HttpGet("clients/{clientName}/messages")] - public ActionResult GetClientMessages(string clientName, [FromQuery] int sentStartIndex = 0, [FromQuery] int receivedStartIndex = 0) + [HttpGet("clients/{clientName}/messages/stream")] + public async Task StreamClientMessages(string clientName) { + Response.ContentType = "text/event-stream"; + Response.Headers.Add("Cache-Control", "no-cache"); + Response.Headers.Add("Connection", "keep-alive"); + var client = _webSocketManager.GetClientInstance(clientName); if (client == null) { - // 即使客户端未连接,也可能是在配置页面中查看,所以返回空而不是404 - return Ok(new - { - SentMessages = new List(), - ReceivedMessages = new List(), - SentCount = 0, - ReceivedCount = 0 - }); + // 发送一个错误事件然后关闭 + await SendSseEvent("error", new { message = "客户端未连接或不存在" }); + return; } - var sentMessages = client.SentMessages.ToList(); - var receivedMessages = client.ReceivedMessages.ToList(); + // 发送一个连接成功事件 + await SendSseEvent("open", new { message = "成功连接到服务器事件流" }); - var newSent = sentMessages.Skip(sentStartIndex).ToList(); - var newReceived = receivedMessages.Skip(receivedStartIndex).ToList(); + int sentIndex = 0; + int receivedIndex = 0; + var cancellationToken = HttpContext.RequestAborted; - return Ok(new + while (!cancellationToken.IsCancellationRequested) { - NewSentMessages = newSent, - NewReceivedMessages = newReceived, - TotalSentCount = sentMessages.Count, - TotalReceivedCount = receivedMessages.Count - }); + bool hasNewMessages = false; + + // 检查并高效地发送新的"已发送"消息 + if (client.SentMessagesCount > sentIndex) + { + var newMessages = client.GetNewSentMessages(sentIndex); + if (newMessages.Any()) + { + await SendSseEvent("update", new { type = "sent", messages = newMessages, totalCount = client.SentMessagesCount }); + sentIndex = client.SentMessagesCount; + hasNewMessages = true; + } + } + + // 检查并高效地发送新的"已接收"消息 + if (client.ReceivedMessagesCount > receivedIndex) + { + var newMessages = client.GetNewReceivedMessages(receivedIndex); + if (newMessages.Any()) + { + await SendSseEvent("update", new { type = "received", messages = newMessages, totalCount = client.ReceivedMessagesCount }); + receivedIndex = client.ReceivedMessagesCount; + hasNewMessages = true; + } + } + + if (hasNewMessages) + { + await Response.Body.FlushAsync(cancellationToken); + } + + await Task.Delay(250, cancellationToken); // 每250毫秒检查一次新消息 + } + } + + private async Task SendSseEvent(string eventName, object data) + { + var json = Newtonsoft.Json.JsonConvert.SerializeObject(data); + await Response.WriteAsync($"event: {eventName}\n"); + await Response.WriteAsync($"data: {json}\n\n"); } /// diff --git a/LTEMvcApp/Services/LTEClientWebSocket.cs b/LTEMvcApp/Services/LTEClientWebSocket.cs index 1a39a28..7cfb991 100644 --- a/LTEMvcApp/Services/LTEClientWebSocket.cs +++ b/LTEMvcApp/Services/LTEClientWebSocket.cs @@ -36,8 +36,10 @@ namespace LTEMvcApp.Services private bool _disposed; private LogParserService logParser = new LogParserService(); private readonly ILogger _logger; - private readonly ConcurrentQueue _sentMessages = new ConcurrentQueue(); - private readonly ConcurrentQueue _receivedMessages = new ConcurrentQueue(); + private readonly List _sentMessages = new List(); + private readonly object _sentLock = new object(); + private readonly List _receivedMessages = new List(); + private readonly object _receivedLock = new object(); #endregion #region 事件 @@ -116,8 +118,40 @@ namespace LTEMvcApp.Services /// public bool IsReadonly => _config.Readonly; - public IEnumerable SentMessages => _sentMessages; - public IEnumerable ReceivedMessages => _receivedMessages; + public int SentMessagesCount + { + get { lock (_sentLock) { return _sentMessages.Count; } } + } + public int ReceivedMessagesCount + { + get { lock (_receivedLock) { return _receivedMessages.Count; } } + } + + public List GetNewSentMessages(int startIndex) + { + lock (_sentLock) + { + var count = _sentMessages.Count - startIndex; + if (count > 0) + { + return _sentMessages.GetRange(startIndex, count); + } + return new List(); + } + } + + public List GetNewReceivedMessages(int startIndex) + { + lock (_receivedLock) + { + var count = _receivedMessages.Count - startIndex; + if (count > 0) + { + return _receivedMessages.GetRange(startIndex, count); + } + return new List(); + } + } #endregion @@ -364,7 +398,10 @@ namespace LTEMvcApp.Services _messageFifo.Enqueue(message); // 记录发送的消息 - _sentMessages.Enqueue(message.ToString(Formatting.Indented)); + lock (_sentLock) + { + _sentMessages.Add(message.ToString(Formatting.Indented)); + } if (_messageFifo.Count < 100) // 批处理大小 { @@ -499,7 +536,10 @@ namespace LTEMvcApp.Services StopTimers(); // 记录接收的消息 - _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented)); + lock (_receivedLock) + { + _receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented)); + } try { @@ -659,7 +699,10 @@ namespace LTEMvcApp.Services { _logger.LogDebug($"[{_config.Name}] 收到消息: {e.Message}"); // 记录接收的消息 - _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented)); + lock (_receivedLock) + { + _receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented)); + } try { var data = e.Message; @@ -853,14 +896,20 @@ namespace LTEMvcApp.Services var json = JsonConvert.SerializeObject(messages[0]); _webSocket.Send(json); // 记录发送的消息 - _sentMessages.Enqueue(JToken.Parse(json).ToString(Formatting.Indented)); + lock (_sentLock) + { + _sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented)); + } } else if (messages.Count > 1) { var json = JsonConvert.SerializeObject(messages); _webSocket.Send(json); // 记录发送的消息 - _sentMessages.Enqueue(JToken.Parse(json).ToString(Formatting.Indented)); + lock (_sentLock) + { + _sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented)); + } } _messageDeferTimer?.Dispose(); diff --git a/LTEMvcApp/Views/Home/ClientMessages.cshtml b/LTEMvcApp/Views/Home/ClientMessages.cshtml index 6cbbcfb..49e1e14 100644 --- a/LTEMvcApp/Views/Home/ClientMessages.cshtml +++ b/LTEMvcApp/Views/Home/ClientMessages.cshtml @@ -10,10 +10,8 @@

客户端消息队列 - @clientName

@@ -31,7 +29,7 @@
- 暂无发送消息 + 正在建立与服务器的连接...
@@ -48,7 +46,7 @@
- 暂无接收消息 + 正在建立与服务器的连接...
@@ -65,54 +63,43 @@