Browse Source

refactor: 优化SSE推送性能

- 使用List<T>和lock替换ConcurrentQueue<T>来存储消息,以支持高效的范围读取。
- 移除了有性能隐患的IEnumerable消息属性,替换为专用的GetNew...Messages方法。
- 控制器调用新方法,避免了在循环中通过LINQ Skip遍历集合,显著提升了服务器性能。
master
root 1 month ago
parent
commit
08706ba700
  1. 81
      LTEMvcApp/Controllers/WebSocketController.cs
  2. 67
      LTEMvcApp/Services/LTEClientWebSocket.cs
  3. 120
      LTEMvcApp/Views/Home/ClientMessages.cshtml

81
LTEMvcApp/Controllers/WebSocketController.cs

@ -3,6 +3,7 @@ using LTEMvcApp.Models;
using LTEMvcApp.Services;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace LTEMvcApp.Controllers
{
@ -244,41 +245,73 @@ namespace LTEMvcApp.Controllers
}
/// <summary>
/// 获取客户端消息队列
/// 使用 Server-Sent Events (SSE) 实时推送客户端消息
/// </summary>
/// <param name="clientName">客户端名称</param>
/// <param name="sentStartIndex">发送消息起始索引</param>
/// <param name="receivedStartIndex">接收消息起始索引</param>
/// <returns>发送和接收的消息队列</returns>
[HttpGet("clients/{clientName}/messages")]
public ActionResult<object> GetClientMessages(string clientName, [FromQuery] int sentStartIndex = 0, [FromQuery] int receivedStartIndex = 0)
[HttpGet("clients/{clientName}/messages/stream")]
public async Task StreamClientMessages(string clientName)
{
Response.ContentType = "text/event-stream";
Response.Headers.Add("Cache-Control", "no-cache");
Response.Headers.Add("Connection", "keep-alive");
var client = _webSocketManager.GetClientInstance(clientName);
if (client == null)
{
// 即使客户端未连接,也可能是在配置页面中查看,所以返回空而不是404
return Ok(new
{
SentMessages = new List<string>(),
ReceivedMessages = new List<string>(),
SentCount = 0,
ReceivedCount = 0
});
// 发送一个错误事件然后关闭
await SendSseEvent("error", new { message = "客户端未连接或不存在" });
return;
}
var sentMessages = client.SentMessages.ToList();
var receivedMessages = client.ReceivedMessages.ToList();
// 发送一个连接成功事件
await SendSseEvent("open", new { message = "成功连接到服务器事件流" });
var newSent = sentMessages.Skip(sentStartIndex).ToList();
var newReceived = receivedMessages.Skip(receivedStartIndex).ToList();
int sentIndex = 0;
int receivedIndex = 0;
var cancellationToken = HttpContext.RequestAborted;
return Ok(new
while (!cancellationToken.IsCancellationRequested)
{
NewSentMessages = newSent,
NewReceivedMessages = newReceived,
TotalSentCount = sentMessages.Count,
TotalReceivedCount = receivedMessages.Count
});
bool hasNewMessages = false;
// 检查并高效地发送新的"已发送"消息
if (client.SentMessagesCount > sentIndex)
{
var newMessages = client.GetNewSentMessages(sentIndex);
if (newMessages.Any())
{
await SendSseEvent("update", new { type = "sent", messages = newMessages, totalCount = client.SentMessagesCount });
sentIndex = client.SentMessagesCount;
hasNewMessages = true;
}
}
// 检查并高效地发送新的"已接收"消息
if (client.ReceivedMessagesCount > receivedIndex)
{
var newMessages = client.GetNewReceivedMessages(receivedIndex);
if (newMessages.Any())
{
await SendSseEvent("update", new { type = "received", messages = newMessages, totalCount = client.ReceivedMessagesCount });
receivedIndex = client.ReceivedMessagesCount;
hasNewMessages = true;
}
}
if (hasNewMessages)
{
await Response.Body.FlushAsync(cancellationToken);
}
await Task.Delay(250, cancellationToken); // 每250毫秒检查一次新消息
}
}
private async Task SendSseEvent(string eventName, object data)
{
var json = Newtonsoft.Json.JsonConvert.SerializeObject(data);
await Response.WriteAsync($"event: {eventName}\n");
await Response.WriteAsync($"data: {json}\n\n");
}
/// <summary>

67
LTEMvcApp/Services/LTEClientWebSocket.cs

@ -36,8 +36,10 @@ namespace LTEMvcApp.Services
private bool _disposed;
private LogParserService logParser = new LogParserService();
private readonly ILogger<LTEClientWebSocket> _logger;
private readonly ConcurrentQueue<string> _sentMessages = new ConcurrentQueue<string>();
private readonly ConcurrentQueue<string> _receivedMessages = new ConcurrentQueue<string>();
private readonly List<string> _sentMessages = new List<string>();
private readonly object _sentLock = new object();
private readonly List<string> _receivedMessages = new List<string>();
private readonly object _receivedLock = new object();
#endregion
#region 事件
@ -116,8 +118,40 @@ namespace LTEMvcApp.Services
/// </summary>
public bool IsReadonly => _config.Readonly;
public IEnumerable<string> SentMessages => _sentMessages;
public IEnumerable<string> ReceivedMessages => _receivedMessages;
public int SentMessagesCount
{
get { lock (_sentLock) { return _sentMessages.Count; } }
}
public int ReceivedMessagesCount
{
get { lock (_receivedLock) { return _receivedMessages.Count; } }
}
public List<string> GetNewSentMessages(int startIndex)
{
lock (_sentLock)
{
var count = _sentMessages.Count - startIndex;
if (count > 0)
{
return _sentMessages.GetRange(startIndex, count);
}
return new List<string>();
}
}
public List<string> GetNewReceivedMessages(int startIndex)
{
lock (_receivedLock)
{
var count = _receivedMessages.Count - startIndex;
if (count > 0)
{
return _receivedMessages.GetRange(startIndex, count);
}
return new List<string>();
}
}
#endregion
@ -364,7 +398,10 @@ namespace LTEMvcApp.Services
_messageFifo.Enqueue(message);
// 记录发送的消息
_sentMessages.Enqueue(message.ToString(Formatting.Indented));
lock (_sentLock)
{
_sentMessages.Add(message.ToString(Formatting.Indented));
}
if (_messageFifo.Count < 100) // 批处理大小
{
@ -499,7 +536,10 @@ namespace LTEMvcApp.Services
StopTimers();
// 记录接收的消息
_receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented));
lock (_receivedLock)
{
_receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented));
}
try
{
@ -659,7 +699,10 @@ namespace LTEMvcApp.Services
{
_logger.LogDebug($"[{_config.Name}] 收到消息: {e.Message}");
// 记录接收的消息
_receivedMessages.Enqueue(JToken.Parse(e.Message).ToString(Formatting.Indented));
lock (_receivedLock)
{
_receivedMessages.Add(JToken.Parse(e.Message).ToString(Formatting.Indented));
}
try
{
var data = e.Message;
@ -853,14 +896,20 @@ namespace LTEMvcApp.Services
var json = JsonConvert.SerializeObject(messages[0]);
_webSocket.Send(json);
// 记录发送的消息
_sentMessages.Enqueue(JToken.Parse(json).ToString(Formatting.Indented));
lock (_sentLock)
{
_sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented));
}
}
else if (messages.Count > 1)
{
var json = JsonConvert.SerializeObject(messages);
_webSocket.Send(json);
// 记录发送的消息
_sentMessages.Enqueue(JToken.Parse(json).ToString(Formatting.Indented));
lock (_sentLock)
{
_sentMessages.Add(JToken.Parse(json).ToString(Formatting.Indented));
}
}
_messageDeferTimer?.Dispose();

120
LTEMvcApp/Views/Home/ClientMessages.cshtml

@ -10,10 +10,8 @@
<div class="card-header">
<h3 class="card-title">客户端消息队列 - @clientName</h3>
<div class="card-tools">
<button type="button" class="btn btn-primary btn-sm" onclick="refreshMessages()">
<i class="fas fa-sync-alt"></i> 刷新
</button>
<a href="@Url.Action("TestClientConfig", "Home")" class="btn btn-info btn-sm">
<span id="connection-status" class="badge badge-secondary">正在连接...</span>
<a href="@Url.Action("TestClientConfig", "Home")" class="btn btn-info btn-sm ml-2">
<i class="fas fa-cog"></i> 配置
</a>
</div>
@ -31,7 +29,7 @@
<div class="card-body" style="max-height: 600px; overflow-y: auto;">
<div id="sentMessages">
<div class="text-muted text-center">
<i class="fas fa-info-circle"></i> 暂无发送消息
<i class="fas fa-spinner fa-spin"></i> 正在建立与服务器的连接...
</div>
</div>
</div>
@ -48,7 +46,7 @@
<div class="card-body" style="max-height: 600px; overflow-y: auto;">
<div id="receivedMessages">
<div class="text-muted text-center">
<i class="fas fa-info-circle"></i> 暂无接收消息
<i class="fas fa-spinner fa-spin"></i> 正在建立与服务器的连接...
</div>
</div>
</div>
@ -65,54 +63,43 @@
<script>
const clientName = '@clientName';
const MAX_MESSAGES = 500; // 每个列表最多显示500条消息
let sentIndex = 0;
let receivedIndex = 0;
let updateInterval;
$(document).ready(function() {
// 立即加载一次,然后每秒请求增量更新
loadInitialMessages();
updateInterval = setInterval(loadIncrementalMessages, 1000); // 1秒更新一次
initializeEventSource();
});
// 首次加载
function loadInitialMessages() {
// 清空现有内容
$('#sentMessages').empty();
$('#receivedMessages').empty();
sentIndex = 0;
receivedIndex = 0;
// 加载所有消息
loadMessages(true);
}
// 增量加载
function loadIncrementalMessages() {
loadMessages(false);
}
function initializeEventSource() {
$('#sentMessages').html('<div class="text-muted text-center"><i class="fas fa-spinner fa-spin"></i> 正在建立与服务器的连接...</div>');
$('#receivedMessages').html('<div class="text-muted text-center"><i class="fas fa-spinner fa-spin"></i> 正在建立与服务器的连接...</div>');
const source = new EventSource(`/api/websocket/clients/${encodeURIComponent(clientName)}/messages/stream`);
const statusBadge = $('#connection-status');
source.addEventListener('open', function(e) {
console.log("SSE connection opened.");
statusBadge.removeClass('badge-secondary badge-danger').addClass('badge-success').text('已连接');
// 清空等待消息
$('#sentMessages').empty();
$('#receivedMessages').empty();
});
function loadMessages(isInitial) {
$.ajax({
url: `/api/websocket/clients/${encodeURIComponent(clientName)}/messages?sentStartIndex=${sentIndex}&receivedStartIndex=${receivedIndex}`,
type: 'GET',
success: function(data) {
updateMessageList('sent', data.newSentMessages, data.totalSentCount, isInitial);
updateMessageList('received', data.newReceivedMessages, data.totalReceivedCount, isInitial);
// 更新下一次请求的起始索引
sentIndex = data.totalSentCount;
receivedIndex = data.totalReceivedCount;
},
error: handleAjaxError
source.addEventListener('update', function(e) {
const data = JSON.parse(e.data);
updateMessageList(data.type, data.messages, data.totalCount);
});
source.addEventListener('error', function(e) {
statusBadge.removeClass('badge-success').addClass('badge-danger').text('连接断开');
if (e.target.readyState === EventSource.CLOSED) {
console.error("SSE connection closed.");
} else {
console.error("SSE error:", e);
}
// EventSource 会自动尝试重连
});
}
function updateMessageList(type, newMessages, totalCount, isInitial) {
if (isInitial && newMessages.length === 0) {
$(`#${type}Messages`).html('<div class="text-muted text-center"><i class="fas fa-info-circle"></i> 暂无消息</div>');
}
function updateMessageList(type, newMessages, totalCount) {
if (!newMessages || newMessages.length === 0) {
$(`#${type}Count`).text(totalCount);
return;
@ -120,13 +107,14 @@
const container = $(`#${type}Messages`);
const fragment = $(document.createDocumentFragment());
// 如果是首次加载,先移除"暂无消息"的提示
if (isInitial) {
// 移除 "暂无消息" 或 "正在连接" 的提示
if (container.children().length === 1 && !container.children().first().hasClass('card')) {
container.empty();
}
let currentIndex = (type === 'sent' ? sentIndex : receivedIndex);
let currentIndex = totalCount - newMessages.length;
newMessages.forEach(function(message) {
const card = createMessageCard(message, currentIndex, type);
fragment.append(card);
@ -134,13 +122,13 @@
});
container.append(fragment);
// 数量限制
// 限制DOM节点数量
const messageCards = container.children('.card');
if (messageCards.length > MAX_MESSAGES) {
messageCards.slice(0, messageCards.length - MAX_MESSAGES).remove();
}
$(`#${type}Count`).text(totalCount);
// 高亮新添加的代码
@ -150,12 +138,6 @@
}
});
}
function handleAjaxError(xhr) {
if (xhr.status !== 404) { // 忽略404,因为它可能是客户端未连接的正常状态
console.error("加载消息失败:", xhr.responseText);
}
}
function createMessageCard(message, index, type) {
const timestamp = new Date().toLocaleTimeString();
@ -178,7 +160,7 @@
</div>
</div>
`;
return $(cardHtml); // 返回jQuery对象
return $(cardHtml);
}
function toggleMessage(button) {
@ -194,7 +176,7 @@
return jsonString;
}
}
function escapeHtml(text) {
return text
.replace(/&/g, "&amp;")
@ -203,20 +185,6 @@
.replace(/"/g, "&quot;")
.replace(/'/g, "&#039;");
}
function refreshMessages() {
// 停止自动更新,手动刷新,然后重新开始
clearInterval(updateInterval);
loadInitialMessages();
updateInterval = setInterval(loadIncrementalMessages, 1000);
}
// 页面卸载时清除定时器
$(window).on('beforeunload', function() {
if (updateInterval) {
clearInterval(updateInterval);
}
});
</script>
<!-- 添加代码高亮支持 -->

Loading…
Cancel
Save