You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
5.8 KiB

using CellularManagement.WebSocket.Connection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Services;
/// <summary>
/// 连接健康检查服务
/// 负责定期检查连接状态,清理不活跃的连接
/// </summary>
public class ConnectionHealthCheckService : BackgroundService
{
private readonly ConnectionManagerCoordinator _coordinator;
private readonly ILogger<ConnectionHealthCheckService> _logger;
private readonly WebSocketOptions _options;
private readonly TimeSpan _checkInterval;
private readonly TimeSpan _inactivityTimeout;
private bool _disposed;
public ConnectionHealthCheckService(
ConnectionManagerCoordinator coordinator,
ILogger<ConnectionHealthCheckService> logger,
IOptions<WebSocketOptions> options)
{
_coordinator = coordinator;
_logger = logger;
_options = options.Value;
_checkInterval = _options.HeartbeatInterval;
_inactivityTimeout = _options.ConnectionTimeout;
_logger.LogInformation("初始化连接健康检查服务,检查间隔:{CheckInterval}秒,超时时间:{Timeout}秒",
_checkInterval.TotalSeconds, _inactivityTimeout.TotalSeconds);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("连接健康检查服务开始运行");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await CheckConnectionsAsync();
await Task.Delay(_checkInterval, stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("连接健康检查服务正在停止");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "连接健康检查服务发生错误");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
_logger.LogInformation("连接健康检查服务已停止");
}
private async Task CheckConnectionsAsync()
{
var connectionManager = _coordinator.GetConnectionManager();
var connections = connectionManager.GetAllConnections();
var now = DateTime.UtcNow;
var inactiveConnections = new List<string>();
foreach (var connection in connections)
{
try
{
// 检查连接是否超时
if (now - connection.LastActivityTime > _inactivityTimeout)
{
_logger.LogWarning("连接超时,连接ID:{ConnectionId},最后活动时间:{LastActivityTime},超时时间:{Timeout}秒",
connection.Id, connection.LastActivityTime, _inactivityTimeout.TotalSeconds);
// inactiveConnections.Add(connection.Id); // 暂时注释掉,不清理超时连接
continue; // 暂时注释掉,继续检查其他状态
}
// 检查连接是否正在处理
if (_coordinator.IsConnectionProcessing(connection.Id))
{
_logger.LogDebug("连接正在处理中,跳过检查,连接ID:{ConnectionId},最后活动时间:{LastActivityTime}",
connection.Id, connection.LastActivityTime);
continue;
}
// 检查连接状态
if (connection.Socket.State != System.Net.WebSockets.WebSocketState.Open)
{
_logger.LogWarning("连接状态异常,连接ID:{ConnectionId},状态:{State},最后活动时间:{LastActivityTime}",
connection.Id, connection.Socket.State, connection.LastActivityTime);
inactiveConnections.Add(connection.Id);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "检查连接时发生错误,连接ID:{ConnectionId},最后活动时间:{LastActivityTime},连接状态:{State}",
connection.Id, connection.LastActivityTime, connection.Socket.State);
inactiveConnections.Add(connection.Id);
}
}
// 清理不活跃的连接
foreach (var connectionId in inactiveConnections)
{
try
{
await connectionManager.RemoveConnectionAsync(connectionId);
_logger.LogInformation("已清理不活跃连接,连接ID:{ConnectionId},清理原因:连接状态异常或检查出错", connectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "清理连接时发生错误,连接ID:{ConnectionId},错误详情:{ErrorMessage}",
connectionId, ex.Message);
}
}
_logger.LogInformation("连接健康检查完成,检查连接数:{TotalConnections},清理连接数:{CleanedConnections},检查时间:{CheckTime}",
connections.Count(), inactiveConnections.Count, now.ToString("yyyy-MM-dd HH:mm:ss"));
}
public override void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_logger.LogInformation("正在释放连接健康检查服务资源");
try
{
// 执行清理操作
_logger.LogInformation("连接健康检查服务资源已释放");
}
catch (Exception ex)
{
_logger.LogError(ex, "释放连接健康检查服务资源时发生错误");
}
finally
{
base.Dispose();
}
}
}