using Microsoft.AspNetCore.Mvc;
using LTEMvcApp.Models;
using LTEMvcApp.Services;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Threading.Tasks;
using System.Threading;
using System.IO;
using System.Text;
using System.Linq;
namespace LTEMvcApp.Controllers
{
///
/// 客户端消息管理控制器
///
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
private readonly WebSocketManagerService _webSocketManager;
private readonly ILogger _logger;
private readonly string _logsDirectory = "ClientMessageLogs";
public MessageController(WebSocketManagerService webSocketManager, ILogger logger)
{
_webSocketManager = webSocketManager;
_logger = logger;
if (!Directory.Exists(_logsDirectory))
{
Directory.CreateDirectory(_logsDirectory);
}
}
///
/// 工具方法:将address转换为合法文件名
///
private static string SanitizeFileName(string address)
{
foreach (var c in System.IO.Path.GetInvalidFileNameChars())
{
address = address.Replace(c, '_');
}
return address;
}
///
/// SSE推送客户端消息流
///
[HttpGet("{address}/stream")]
public async Task StreamClientMessages(string address)
{
try
{
Response.ContentType = "text/event-stream";
Response.Headers.Append("Cache-Control", "no-cache");
Response.Headers.Append("Connection", "keep-alive");
Response.Headers.Append("Access-Control-Allow-Origin", "*");
var client = _webSocketManager.GetClientInstance(address);
if (client == null)
{
await SendSseEvent("error", new { message = "客户端未连接或不存在", address });
return;
}
await SendSseEvent("open", new {
message = "成功连接到服务器事件流",
address,
timestamp = DateTime.UtcNow
});
await Response.Body.FlushAsync(HttpContext.RequestAborted);
int lastSentCount = 0;
int lastReceivedCount = 0;
var cancellationToken = HttpContext.RequestAborted;
// 使用安全文件名
var safeAddress = SanitizeFileName(address);
var sentLogFilePath = Path.Combine(_logsDirectory, $"{safeAddress}_sent_messages.log");
var receivedLogFilePath = Path.Combine(_logsDirectory, $"{safeAddress}_received_messages.log");
while (!cancellationToken.IsCancellationRequested)
{
try
{
bool hasNewMessages = false;
var currentSentCount = client.SentMessagesCount;
if (currentSentCount > lastSentCount)
{
var sentMessages = client.SentMessages?.ToList() ?? new List();
if (sentMessages.Count > lastSentCount)
{
var newMessages = sentMessages.Skip(lastSentCount).ToList();
if (newMessages.Any())
{
await LogMessagesToFile(sentLogFilePath, newMessages, "SENT", address);
await SendSseEvent("update", new {
type = "sent",
messages = newMessages,
totalCount = currentSentCount,
newCount = newMessages.Count
});
lastSentCount = currentSentCount;
hasNewMessages = true;
}
}
}
var currentReceivedCount = client.ReceivedMessagesCount;
if (currentReceivedCount > lastReceivedCount)
{
var receivedMessages = client.ReceivedMessages?.ToList() ?? new List();
if (receivedMessages.Count > lastReceivedCount)
{
var newMessages = receivedMessages.Skip(lastReceivedCount).ToList();
if (newMessages.Any())
{
await LogMessagesToFile(receivedLogFilePath, newMessages, "RECEIVED", address);
await SendSseEvent("update", new {
type = "received",
messages = newMessages,
totalCount = currentReceivedCount,
newCount = newMessages.Count
});
lastReceivedCount = currentReceivedCount;
hasNewMessages = true;
}
}
}
if (hasNewMessages)
{
await Response.Body.FlushAsync(cancellationToken);
}
await Task.Delay(250, cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "StreamClientMessages 循环中发生错误,客户端: {ClientName}", address);
await SendSseEvent("error", new {
message = "处理消息流时发生错误",
error = ex.Message,
address,
timestamp = DateTime.UtcNow
});
await Response.Body.FlushAsync(cancellationToken);
await Task.Delay(1000, cancellationToken);
}
}
await SendSseEvent("disconnected", new {
message = "客户端消息流连接已断开",
address,
timestamp = DateTime.UtcNow
});
await Response.Body.FlushAsync(cancellationToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("StreamClientMessages 连接被客户端取消,客户端: {ClientName}", address);
}
catch (Exception ex)
{
_logger.LogError(ex, "StreamClientMessages 方法执行时发生未处理的异常,客户端: {ClientName}", address);
try
{
await SendSseEvent("fatal_error", new {
message = "服务器内部错误",
error = ex.Message,
address,
timestamp = DateTime.UtcNow
});
await Response.Body.FlushAsync();
}
catch { }
}
}
///
/// 发送消息到客户端
///
[HttpPost("{address}/send")]
public ActionResult SendMessage(string address, [FromBody] JObject message)
{
var messageId = _webSocketManager.SendMessageToClient(address, message);
if (messageId >= 0)
return Ok(new { messageId, message = $"消息已发送到客户端 '{address}'" });
else
return BadRequest($"发送消息到客户端 '{address}' 失败");
}
///
/// 获取客户端消息日志文件列表
///
[HttpGet("logs")]
public ActionResult