|
|
@ -253,67 +253,169 @@ namespace LTEMvcApp.Controllers |
|
|
|
[HttpGet("clients/{clientName}/messages/stream")] |
|
|
|
public async Task StreamClientMessages(string clientName) |
|
|
|
{ |
|
|
|
Response.ContentType = "text/event-stream"; |
|
|
|
Response.Headers.Add("Cache-Control", "no-cache"); |
|
|
|
Response.Headers.Add("Connection", "keep-alive"); |
|
|
|
|
|
|
|
var client = _webSocketManager.GetClientInstance(clientName); |
|
|
|
if (client == null) |
|
|
|
try |
|
|
|
{ |
|
|
|
// 发送一个错误事件然后关闭
|
|
|
|
await SendSseEvent("error", new { message = "客户端未连接或不存在" }); |
|
|
|
return; |
|
|
|
} |
|
|
|
Response.ContentType = "text/event-stream"; |
|
|
|
Response.Headers.Append("Cache-Control", "no-cache"); |
|
|
|
Response.Headers.Append("Connection", "keep-alive"); |
|
|
|
Response.Headers.Append("Access-Control-Allow-Origin", "*"); |
|
|
|
|
|
|
|
// 发送一个连接成功事件
|
|
|
|
await SendSseEvent("open", new { message = "成功连接到服务器事件流" }); |
|
|
|
var client = _webSocketManager.GetClientInstance(clientName); |
|
|
|
if (client == null) |
|
|
|
{ |
|
|
|
// 发送一个错误事件然后关闭
|
|
|
|
await SendSseEvent("error", new { message = "客户端未连接或不存在", clientName }); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
int sentIndex = 0; |
|
|
|
int receivedIndex = 0; |
|
|
|
var cancellationToken = HttpContext.RequestAborted; |
|
|
|
// 发送一个连接成功事件
|
|
|
|
await SendSseEvent("open", new { |
|
|
|
message = "成功连接到服务器事件流", |
|
|
|
clientName, |
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(HttpContext.RequestAborted); |
|
|
|
|
|
|
|
while (!cancellationToken.IsCancellationRequested) |
|
|
|
{ |
|
|
|
bool hasNewMessages = false; |
|
|
|
int lastSentCount = 0; |
|
|
|
int lastReceivedCount = 0; |
|
|
|
var cancellationToken = HttpContext.RequestAborted; |
|
|
|
|
|
|
|
// 检查并高效地发送新的"已发送"消息
|
|
|
|
if (client.SentMessagesCount > sentIndex) |
|
|
|
while (!cancellationToken.IsCancellationRequested) |
|
|
|
{ |
|
|
|
var newMessages = client.SentMessages.Skip(sentIndex).ToList(); |
|
|
|
if (newMessages.Any()) |
|
|
|
try |
|
|
|
{ |
|
|
|
await SendSseEvent("update", new { type = "sent", messages = newMessages, totalCount = client.SentMessagesCount }); |
|
|
|
sentIndex = client.SentMessagesCount; |
|
|
|
hasNewMessages = true; |
|
|
|
bool hasNewMessages = false; |
|
|
|
|
|
|
|
// 检查并高效地发送新的"已发送"消息
|
|
|
|
var currentSentCount = client.SentMessagesCount; |
|
|
|
if (currentSentCount > lastSentCount) |
|
|
|
{ |
|
|
|
var sentMessages = client.SentMessages?.ToList() ?? new List<string>(); |
|
|
|
if (sentMessages.Count > lastSentCount) |
|
|
|
{ |
|
|
|
var newMessages = sentMessages.Skip(lastSentCount).ToList(); |
|
|
|
if (newMessages.Any()) |
|
|
|
{ |
|
|
|
await SendSseEvent("update", new { |
|
|
|
type = "sent", |
|
|
|
messages = newMessages, |
|
|
|
totalCount = currentSentCount, |
|
|
|
newCount = newMessages.Count |
|
|
|
}); |
|
|
|
lastSentCount = currentSentCount; |
|
|
|
hasNewMessages = true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 检查并高效地发送新的"已接收"消息
|
|
|
|
var currentReceivedCount = client.ReceivedMessagesCount; |
|
|
|
if (currentReceivedCount > lastReceivedCount) |
|
|
|
{ |
|
|
|
var receivedMessages = client.ReceivedMessages?.ToList() ?? new List<string>(); |
|
|
|
if (receivedMessages.Count > lastReceivedCount) |
|
|
|
{ |
|
|
|
var newMessages = receivedMessages.Skip(lastReceivedCount).ToList(); |
|
|
|
if (newMessages.Any()) |
|
|
|
{ |
|
|
|
await SendSseEvent("update", new { |
|
|
|
type = "received", |
|
|
|
messages = newMessages, |
|
|
|
totalCount = currentReceivedCount, |
|
|
|
newCount = newMessages.Count |
|
|
|
}); |
|
|
|
lastReceivedCount = currentReceivedCount; |
|
|
|
hasNewMessages = true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (hasNewMessages) |
|
|
|
{ |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
await Task.Delay(250, cancellationToken); // 每250毫秒检查一次新消息
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 检查并高效地发送新的"已接收"消息
|
|
|
|
if (client.ReceivedMessagesCount > receivedIndex) |
|
|
|
{ |
|
|
|
var newMessages = client.ReceivedMessages.Skip(receivedIndex).ToList(); |
|
|
|
if (newMessages.Any()) |
|
|
|
catch (OperationCanceledException) |
|
|
|
{ |
|
|
|
// 正常的取消操作,退出循环
|
|
|
|
break; |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
await SendSseEvent("update", new { type = "received", messages = newMessages, totalCount = client.ReceivedMessagesCount }); |
|
|
|
receivedIndex = client.ReceivedMessagesCount; |
|
|
|
hasNewMessages = true; |
|
|
|
_logger.LogError(ex, "StreamClientMessages 循环中发生错误,客户端: {ClientName}", clientName); |
|
|
|
await SendSseEvent("error", new { |
|
|
|
message = "处理消息流时发生错误", |
|
|
|
error = ex.Message, |
|
|
|
clientName, |
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
|
|
|
|
// 等待一段时间后继续,避免频繁错误
|
|
|
|
await Task.Delay(1000, cancellationToken); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (hasNewMessages) |
|
|
|
// 发送断开连接事件
|
|
|
|
await SendSseEvent("disconnected", new { |
|
|
|
message = "客户端消息流连接已断开", |
|
|
|
clientName, |
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
} |
|
|
|
catch (OperationCanceledException) |
|
|
|
{ |
|
|
|
_logger.LogInformation("StreamClientMessages 连接被客户端取消,客户端: {ClientName}", clientName); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "StreamClientMessages 方法执行时发生未处理的异常,客户端: {ClientName}", clientName); |
|
|
|
try |
|
|
|
{ |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
await SendSseEvent("fatal_error", new { |
|
|
|
message = "服务器内部错误", |
|
|
|
error = ex.Message, |
|
|
|
clientName, |
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(); |
|
|
|
} |
|
|
|
catch |
|
|
|
{ |
|
|
|
// 忽略发送错误事件时的异常
|
|
|
|
} |
|
|
|
|
|
|
|
await Task.Delay(250, cancellationToken); // 每250毫秒检查一次新消息
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task SendSseEvent(string eventName, object data) |
|
|
|
{ |
|
|
|
var json = Newtonsoft.Json.JsonConvert.SerializeObject(data); |
|
|
|
await Response.WriteAsync($"event: {eventName}\n"); |
|
|
|
await Response.WriteAsync($"data: {json}\n\n"); |
|
|
|
try |
|
|
|
{ |
|
|
|
if (string.IsNullOrEmpty(eventName)) |
|
|
|
{ |
|
|
|
_logger.LogWarning("尝试发送空事件名称的SSE事件"); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (data == null) |
|
|
|
{ |
|
|
|
_logger.LogWarning("尝试发送空数据的SSE事件: {EventName}", eventName); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
var json = Newtonsoft.Json.JsonConvert.SerializeObject(data); |
|
|
|
var eventData = $"event: {eventName}\ndata: {json}\n\n"; |
|
|
|
|
|
|
|
await Response.WriteAsync(eventData); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "发送SSE事件时发生错误: {EventName}", eventName); |
|
|
|
// 不重新抛出异常,避免影响整个流
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/// <summary>
|
|
|
@ -375,40 +477,195 @@ namespace LTEMvcApp.Controllers |
|
|
|
} |
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
/// 使用 Server-Sent Events (SSE) 实时推送全局日志
|
|
|
|
/// 获取日志缓存统计信息
|
|
|
|
/// </summary>
|
|
|
|
[HttpGet("logs/stream")] |
|
|
|
public async Task StreamLogs(CancellationToken cancellationToken) |
|
|
|
/// <returns>统计信息</returns>
|
|
|
|
[HttpGet("logs/stats")] |
|
|
|
public ActionResult<object> GetLogCacheStats() |
|
|
|
{ |
|
|
|
Response.ContentType = "text/event-stream"; |
|
|
|
Response.Headers.Append("Cache-Control", "no-cache"); |
|
|
|
Response.Headers.Append("Connection", "keep-alive"); |
|
|
|
try |
|
|
|
{ |
|
|
|
var stats = new |
|
|
|
{ |
|
|
|
totalLogs = _webSocketManager.GetLogCacheCount(), |
|
|
|
cacheSize = 10000, // LogCacheSize
|
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}; |
|
|
|
return Ok(stats); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "获取日志缓存统计信息时发生错误"); |
|
|
|
return StatusCode(500, new { message = "获取统计信息失败", error = ex.Message }); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
int logIndex = 0; |
|
|
|
/// <summary>
|
|
|
|
/// 清空全局日志缓存
|
|
|
|
/// </summary>
|
|
|
|
/// <returns>操作结果</returns>
|
|
|
|
[HttpPost("logs/clear")] |
|
|
|
public ActionResult ClearLogCache() |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
_webSocketManager.ClearLogCache(); |
|
|
|
return Ok(new { message = "日志缓存已清空" }); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "清空日志缓存时发生错误"); |
|
|
|
return StatusCode(500, new { message = "清空日志缓存失败", error = ex.Message }); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 首先,一次性推送所有已缓存的日志
|
|
|
|
var initialLogs = _webSocketManager.GetLogCache().ToList(); |
|
|
|
if (initialLogs.Any()) |
|
|
|
/// <summary>
|
|
|
|
/// 重置全局日志缓存
|
|
|
|
/// </summary>
|
|
|
|
/// <returns>操作结果</returns>
|
|
|
|
[HttpPost("logs/reset")] |
|
|
|
public ActionResult ResetLogCache() |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
await SendSseEvent("history", new { logs = initialLogs }); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
logIndex = initialLogs.Count; |
|
|
|
_webSocketManager.ResetLogCache(); |
|
|
|
return Ok(new { message = "日志缓存已重置" }); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "重置日志缓存时发生错误"); |
|
|
|
return StatusCode(500, new { message = "重置日志缓存失败", error = ex.Message }); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
while (!cancellationToken.IsCancellationRequested) |
|
|
|
/// <summary>
|
|
|
|
/// 使用 Server-Sent Events (SSE) 实时推送全局日志
|
|
|
|
/// </summary>
|
|
|
|
[HttpGet("logs/stream")] |
|
|
|
public async Task StreamLogs(CancellationToken cancellationToken) |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
var logCache = _webSocketManager.GetLogCache(); |
|
|
|
if (logCache.Count() > logIndex) |
|
|
|
Response.ContentType = "text/event-stream"; |
|
|
|
Response.Headers.Append("Cache-Control", "no-cache"); |
|
|
|
Response.Headers.Append("Connection", "keep-alive"); |
|
|
|
Response.Headers.Append("Access-Control-Allow-Origin", "*"); |
|
|
|
|
|
|
|
// 发送连接成功事件
|
|
|
|
await SendSseEvent("connected", new { message = "日志流连接已建立", timestamp = DateTime.UtcNow }); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
|
|
|
|
int lastLogCount = 0; |
|
|
|
var lastLogs = new List<LTELog>(); |
|
|
|
|
|
|
|
// 首先,一次性推送所有已缓存的日志
|
|
|
|
try |
|
|
|
{ |
|
|
|
var newLogs = logCache.Skip(logIndex).ToList(); |
|
|
|
if (newLogs.Any()) |
|
|
|
var initialLogs = _webSocketManager.GetLogCache()?.ToList() ?? new List<LTELog>(); |
|
|
|
if (initialLogs.Any()) |
|
|
|
{ |
|
|
|
await SendSseEvent("new_logs", new { logs = newLogs }); |
|
|
|
await SendSseEvent("history", new { logs = initialLogs, totalCount = initialLogs.Count }); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
logIndex = logCache.Count(); |
|
|
|
lastLogCount = initialLogs.Count; |
|
|
|
lastLogs = initialLogs.ToList(); // 保存副本用于比较
|
|
|
|
} |
|
|
|
} |
|
|
|
await Task.Delay(250, cancellationToken); |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "获取初始日志时发生错误"); |
|
|
|
await SendSseEvent("error", new { message = "获取初始日志失败", error = ex.Message }); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
} |
|
|
|
|
|
|
|
while (!cancellationToken.IsCancellationRequested) |
|
|
|
{ |
|
|
|
try |
|
|
|
{ |
|
|
|
var currentLogs = _webSocketManager.GetLogCache()?.ToList() ?? new List<LTELog>(); |
|
|
|
|
|
|
|
// 检查是否有新日志
|
|
|
|
if (currentLogs.Count > lastLogCount) |
|
|
|
{ |
|
|
|
// 计算新增的日志
|
|
|
|
var newLogs = currentLogs.Skip(lastLogCount).ToList(); |
|
|
|
|
|
|
|
if (newLogs.Any()) |
|
|
|
{ |
|
|
|
await SendSseEvent("new_logs", new { |
|
|
|
logs = newLogs, |
|
|
|
totalCount = currentLogs.Count, |
|
|
|
newCount = newLogs.Count |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
|
|
|
|
// 更新索引和缓存
|
|
|
|
lastLogCount = currentLogs.Count; |
|
|
|
lastLogs = currentLogs.ToList(); |
|
|
|
} |
|
|
|
} |
|
|
|
else if (currentLogs.Count < lastLogCount) |
|
|
|
{ |
|
|
|
// 日志被清空或重置的情况
|
|
|
|
_logger.LogInformation("检测到日志缓存被重置,重新同步"); |
|
|
|
await SendSseEvent("reset", new { |
|
|
|
message = "日志缓存已重置", |
|
|
|
totalCount = currentLogs.Count |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
|
|
|
|
lastLogCount = currentLogs.Count; |
|
|
|
lastLogs = currentLogs.ToList(); |
|
|
|
} |
|
|
|
|
|
|
|
await Task.Delay(250, cancellationToken); |
|
|
|
} |
|
|
|
catch (OperationCanceledException) |
|
|
|
{ |
|
|
|
// 正常的取消操作,退出循环
|
|
|
|
break; |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "StreamLogs 循环中发生错误"); |
|
|
|
await SendSseEvent("error", new { |
|
|
|
message = "处理日志流时发生错误", |
|
|
|
error = ex.Message, |
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
|
|
|
|
// 等待一段时间后继续,避免频繁错误
|
|
|
|
await Task.Delay(1000, cancellationToken); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 发送断开连接事件
|
|
|
|
await SendSseEvent("disconnected", new { |
|
|
|
message = "日志流连接已断开", |
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(cancellationToken); |
|
|
|
} |
|
|
|
catch (OperationCanceledException) |
|
|
|
{ |
|
|
|
_logger.LogInformation("StreamLogs 连接被客户端取消"); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
_logger.LogError(ex, "StreamLogs 方法执行时发生未处理的异常"); |
|
|
|
try |
|
|
|
{ |
|
|
|
await SendSseEvent("fatal_error", new { |
|
|
|
message = "服务器内部错误", |
|
|
|
error = ex.Message, |
|
|
|
timestamp = DateTime.UtcNow |
|
|
|
}); |
|
|
|
await Response.Body.FlushAsync(); |
|
|
|
} |
|
|
|
catch |
|
|
|
{ |
|
|
|
// 忽略发送错误事件时的异常
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|