From dfa617e98a615b384ce687cd5b9a2b01a861aa64 Mon Sep 17 00:00:00 2001 From: root <295172551@qq.com> Date: Sun, 22 Jun 2025 05:05:46 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3=E5=86=85=E5=AD=98?= =?UTF-8?q?=E6=97=A0=E9=99=90=E5=A2=9E=E9=95=BF=E5=92=8CSSE=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 使用有数量上限的ConcurrentQueue存储消息,防止服务器内存耗尽。 - 修复了因API不匹配导致的编译错误。 - 最终方案确保了SSE推送在内存安全和高性能之间达到平衡。 --- LTEMvcApp/Controllers/WebSocketController.cs | 5 +- LTEMvcApp/Services/LTEClientWebSocket.cs | 71 +++++++------------- 2 files changed, 27 insertions(+), 49 deletions(-) diff --git a/LTEMvcApp/Controllers/WebSocketController.cs b/LTEMvcApp/Controllers/WebSocketController.cs index 1a4986a..0831e64 100644 --- a/LTEMvcApp/Controllers/WebSocketController.cs +++ b/LTEMvcApp/Controllers/WebSocketController.cs @@ -4,6 +4,7 @@ using LTEMvcApp.Services; using Newtonsoft.Json.Linq; using Microsoft.Extensions.Logging; using System.Threading.Tasks; +using System.Linq; namespace LTEMvcApp.Controllers { @@ -277,7 +278,7 @@ namespace LTEMvcApp.Controllers // 检查并高效地发送新的"已发送"消息 if (client.SentMessagesCount > sentIndex) { - var newMessages = client.GetNewSentMessages(sentIndex); + var newMessages = client.SentMessages.Skip(sentIndex).ToList(); if (newMessages.Any()) { await SendSseEvent("update", new { type = "sent", messages = newMessages, totalCount = client.SentMessagesCount }); @@ -289,7 +290,7 @@ namespace LTEMvcApp.Controllers // 检查并高效地发送新的"已接收"消息 if (client.ReceivedMessagesCount > receivedIndex) { - var newMessages = client.GetNewReceivedMessages(receivedIndex); + var newMessages = client.ReceivedMessages.Skip(receivedIndex).ToList(); if (newMessages.Any()) { await SendSseEvent("update", new { type = "received", messages = newMessages, totalCount = client.ReceivedMessagesCount }); diff --git a/LTEMvcApp/Services/LTEClientWebSocket.cs b/LTEMvcApp/Services/LTEClientWebSocket.cs index 7cfb991..9598749 100644 --- a/LTEMvcApp/Services/LTEClientWebSocket.cs +++ b/LTEMvcApp/Services/LTEClientWebSocket.cs @@ -36,10 +36,9 @@ namespace LTEMvcApp.Services private bool _disposed; private LogParserService logParser = new LogParserService(); private readonly ILogger _logger; - private readonly List _sentMessages = new List(); - private readonly object _sentLock = new object(); - private readonly List _receivedMessages = new List(); - private readonly object _receivedLock = new object(); + private const int ServerMessageCacheLimit = 1000; + private readonly ConcurrentQueue _sentMessages = new ConcurrentQueue(); + private readonly ConcurrentQueue _receivedMessages = new ConcurrentQueue(); #endregion #region 事件 @@ -118,40 +117,11 @@ namespace LTEMvcApp.Services /// public bool IsReadonly => _config.Readonly; - public int SentMessagesCount - { - get { lock (_sentLock) { return _sentMessages.Count; } } - } - public int ReceivedMessagesCount - { - get { lock (_receivedLock) { return _receivedMessages.Count; } } - } + public IEnumerable SentMessages => _sentMessages; + public IEnumerable ReceivedMessages => _receivedMessages; - public List GetNewSentMessages(int startIndex) - { - lock (_sentLock) - { - var count = _sentMessages.Count - startIndex; - if (count > 0) - { - return _sentMessages.GetRange(startIndex, count); - } - return new List(); - } - } - - public List GetNewReceivedMessages(int startIndex) - { - lock (_receivedLock) - { - var count = _receivedMessages.Count - startIndex; - if (count > 0) - { - return _receivedMessages.GetRange(startIndex, count); - } - return new List(); - } - } + public int SentMessagesCount => _sentMessages.Count; + public int ReceivedMessagesCount => _receivedMessages.Count; #endregion @@ -398,9 +368,10 @@ namespace LTEMvcApp.Services _messageFifo.Enqueue(message); // 记录发送的消息 - lock (_sentLock) + _sentMessages.Enqueue(message.ToString(Formatting.Indented)); + while (_sentMessages.Count > ServerMessageCacheLimit) { - _sentMessages.Add(message.ToString(Formatting.Indented)); + _sentMessages.TryDequeue(out _); } if (_messageFifo.Count < 100) // 批处理大小 @@ -536,9 +507,10 @@ namespace LTEMvcApp.Services StopTimers(); // 记录接收的消息 - lock (_receivedLock) + _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented)); + while (_receivedMessages.Count > ServerMessageCacheLimit) { - _receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented)); + _receivedMessages.TryDequeue(out _); } try @@ -699,9 +671,10 @@ namespace LTEMvcApp.Services { _logger.LogDebug($"[{_config.Name}] 收到消息: {e.Message}"); // 记录接收的消息 - lock (_receivedLock) + _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented)); + while (_receivedMessages.Count > ServerMessageCacheLimit) { - _receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented)); + _receivedMessages.TryDequeue(out _); } try { @@ -896,9 +869,11 @@ namespace LTEMvcApp.Services var json = JsonConvert.SerializeObject(messages[0]); _webSocket.Send(json); // 记录发送的消息 - lock (_sentLock) + var formattedJson = JToken.Parse(json).ToString(Formatting.Indented); + _sentMessages.Enqueue(formattedJson); + while (_sentMessages.Count > ServerMessageCacheLimit) { - _sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented)); + _sentMessages.TryDequeue(out _); } } else if (messages.Count > 1) @@ -906,9 +881,11 @@ namespace LTEMvcApp.Services var json = JsonConvert.SerializeObject(messages); _webSocket.Send(json); // 记录发送的消息 - lock (_sentLock) + var formattedJson = JToken.Parse(json).ToString(Formatting.Indented); + _sentMessages.Enqueue(formattedJson); + while (_sentMessages.Count > ServerMessageCacheLimit) { - _sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented)); + _sentMessages.TryDequeue(out _); } }