Browse Source

fix: 解决内存无限增长和SSE性能问题

- 使用有数量上限的ConcurrentQueue存储消息,防止服务器内存耗尽。
- 修复了因API不匹配导致的编译错误。
- 最终方案确保了SSE推送在内存安全和高性能之间达到平衡。
master
root 2 months ago
parent
commit
dfa617e98a
  1. 5
      LTEMvcApp/Controllers/WebSocketController.cs
  2. 71
      LTEMvcApp/Services/LTEClientWebSocket.cs

5
LTEMvcApp/Controllers/WebSocketController.cs

@ -4,6 +4,7 @@ using LTEMvcApp.Services;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Linq;
namespace LTEMvcApp.Controllers namespace LTEMvcApp.Controllers
{ {
@ -277,7 +278,7 @@ namespace LTEMvcApp.Controllers
// 检查并高效地发送新的"已发送"消息 // 检查并高效地发送新的"已发送"消息
if (client.SentMessagesCount > sentIndex) if (client.SentMessagesCount > sentIndex)
{ {
var newMessages = client.GetNewSentMessages(sentIndex); var newMessages = client.SentMessages.Skip(sentIndex).ToList();
if (newMessages.Any()) if (newMessages.Any())
{ {
await SendSseEvent("update", new { type = "sent", messages = newMessages, totalCount = client.SentMessagesCount }); await SendSseEvent("update", new { type = "sent", messages = newMessages, totalCount = client.SentMessagesCount });
@ -289,7 +290,7 @@ namespace LTEMvcApp.Controllers
// 检查并高效地发送新的"已接收"消息 // 检查并高效地发送新的"已接收"消息
if (client.ReceivedMessagesCount > receivedIndex) if (client.ReceivedMessagesCount > receivedIndex)
{ {
var newMessages = client.GetNewReceivedMessages(receivedIndex); var newMessages = client.ReceivedMessages.Skip(receivedIndex).ToList();
if (newMessages.Any()) if (newMessages.Any())
{ {
await SendSseEvent("update", new { type = "received", messages = newMessages, totalCount = client.ReceivedMessagesCount }); await SendSseEvent("update", new { type = "received", messages = newMessages, totalCount = client.ReceivedMessagesCount });

71
LTEMvcApp/Services/LTEClientWebSocket.cs

@ -36,10 +36,9 @@ namespace LTEMvcApp.Services
private bool _disposed; private bool _disposed;
private LogParserService logParser = new LogParserService(); private LogParserService logParser = new LogParserService();
private readonly ILogger<LTEClientWebSocket> _logger; private readonly ILogger<LTEClientWebSocket> _logger;
private readonly List<string> _sentMessages = new List<string>(); private const int ServerMessageCacheLimit = 1000;
private readonly object _sentLock = new object(); private readonly ConcurrentQueue<string> _sentMessages = new ConcurrentQueue<string>();
private readonly List<string> _receivedMessages = new List<string>(); private readonly ConcurrentQueue<string> _receivedMessages = new ConcurrentQueue<string>();
private readonly object _receivedLock = new object();
#endregion #endregion
#region 事件 #region 事件
@ -118,40 +117,11 @@ namespace LTEMvcApp.Services
/// </summary> /// </summary>
public bool IsReadonly => _config.Readonly; public bool IsReadonly => _config.Readonly;
public int SentMessagesCount public IEnumerable<string> SentMessages => _sentMessages;
{ public IEnumerable<string> ReceivedMessages => _receivedMessages;
get { lock (_sentLock) { return _sentMessages.Count; } }
}
public int ReceivedMessagesCount
{
get { lock (_receivedLock) { return _receivedMessages.Count; } }
}
public List<string> GetNewSentMessages(int startIndex) public int SentMessagesCount => _sentMessages.Count;
{ public int ReceivedMessagesCount => _receivedMessages.Count;
lock (_sentLock)
{
var count = _sentMessages.Count - startIndex;
if (count > 0)
{
return _sentMessages.GetRange(startIndex, count);
}
return new List<string>();
}
}
public List<string> GetNewReceivedMessages(int startIndex)
{
lock (_receivedLock)
{
var count = _receivedMessages.Count - startIndex;
if (count > 0)
{
return _receivedMessages.GetRange(startIndex, count);
}
return new List<string>();
}
}
#endregion #endregion
@ -398,9 +368,10 @@ namespace LTEMvcApp.Services
_messageFifo.Enqueue(message); _messageFifo.Enqueue(message);
// 记录发送的消息 // 记录发送的消息
lock (_sentLock) _sentMessages.Enqueue(message.ToString(Formatting.Indented));
while (_sentMessages.Count > ServerMessageCacheLimit)
{ {
_sentMessages.Add(message.ToString(Formatting.Indented)); _sentMessages.TryDequeue(out _);
} }
if (_messageFifo.Count < 100) // 批处理大小 if (_messageFifo.Count < 100) // 批处理大小
@ -536,9 +507,10 @@ namespace LTEMvcApp.Services
StopTimers(); StopTimers();
// 记录接收的消息 // 记录接收的消息
lock (_receivedLock) _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented));
while (_receivedMessages.Count > ServerMessageCacheLimit)
{ {
_receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented)); _receivedMessages.TryDequeue(out _);
} }
try try
@ -699,9 +671,10 @@ namespace LTEMvcApp.Services
{ {
_logger.LogDebug($"[{_config.Name}] 收到消息: {e.Message}"); _logger.LogDebug($"[{_config.Name}] 收到消息: {e.Message}");
// 记录接收的消息 // 记录接收的消息
lock (_receivedLock) _receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented));
while (_receivedMessages.Count > ServerMessageCacheLimit)
{ {
_receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented)); _receivedMessages.TryDequeue(out _);
} }
try try
{ {
@ -896,9 +869,11 @@ namespace LTEMvcApp.Services
var json = JsonConvert.SerializeObject(messages[0]); var json = JsonConvert.SerializeObject(messages[0]);
_webSocket.Send(json); _webSocket.Send(json);
// 记录发送的消息 // 记录发送的消息
lock (_sentLock) var formattedJson = JToken.Parse(json).ToString(Formatting.Indented);
_sentMessages.Enqueue(formattedJson);
while (_sentMessages.Count > ServerMessageCacheLimit)
{ {
_sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented)); _sentMessages.TryDequeue(out _);
} }
} }
else if (messages.Count > 1) else if (messages.Count > 1)
@ -906,9 +881,11 @@ namespace LTEMvcApp.Services
var json = JsonConvert.SerializeObject(messages); var json = JsonConvert.SerializeObject(messages);
_webSocket.Send(json); _webSocket.Send(json);
// 记录发送的消息 // 记录发送的消息
lock (_sentLock) var formattedJson = JToken.Parse(json).ToString(Formatting.Indented);
_sentMessages.Enqueue(formattedJson);
while (_sentMessages.Count > ServerMessageCacheLimit)
{ {
_sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented)); _sentMessages.TryDequeue(out _);
} }
} }

Loading…
Cancel
Save