Browse Source

WebSocket 文件重新分类

norm
hyh 2 months ago
parent
commit
716637c03b
  1. 11
      src/CellularManagement.WebSocket/Connection/ConnectionStatus.cs
  2. 13
      src/CellularManagement.WebSocket/Connection/WebSocketConnection.cs
  3. 259
      src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
  4. 17
      src/CellularManagement.WebSocket/Exceptions/WebSocketExceptions.cs
  5. 12
      src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs
  6. 5
      src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
  7. 15
      src/CellularManagement.WebSocket/Models/WebSocketMessage.cs
  8. 17
      src/CellularManagement.WebSocket/Models/WebSocketOptions.cs
  9. 1
      src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs
  10. 1
      src/CellularManagement.WebSocket/Pipeline/Steps/MessageValidationStep.cs
  11. 1
      src/CellularManagement.WebSocket/Services/Handlers/ChatMessageHandler.cs
  12. 1
      src/CellularManagement.WebSocket/Services/Handlers/NotificationMessageHandler.cs
  13. 1
      src/CellularManagement.WebSocket/Services/IWebSocketMessageHandler.cs
  14. 1
      src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

11
src/CellularManagement.WebSocket/Connection/ConnectionStatus.cs

@ -0,0 +1,11 @@
namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// 连接状态枚举
/// </summary>
public enum ConnectionStatus
{
Connected, // 已连接
Disconnected, // 已断开
Error // 错误状态
}

13
src/CellularManagement.WebSocket/Connection/WebSocketConnection.cs

@ -0,0 +1,13 @@
using System.Net.WebSockets;
namespace CellularManagement.WebSocket.Connection;
/// <summary>
/// WebSocket 连接信息
/// </summary>
public class WebSocketConnection
{
public System.Net.WebSockets.WebSocket Socket { get; set; } = null!;
public DateTime LastActivityTime { get; set; }
public ConnectionStatus Status { get; set; }
}

259
src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs

@ -2,6 +2,10 @@ using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Extensions;
using CellularManagement.WebSocket.Models;
using CellularManagement.WebSocket.Exceptions;
namespace CellularManagement.WebSocket.Connection;
@ -11,34 +15,33 @@ namespace CellularManagement.WebSocket.Connection;
/// </summary>
public class WebSocketConnectionManager : IDisposable
{
// 存储所有活动的 WebSocket 连接
private readonly ConcurrentDictionary<string, WebSocketConnection> _connections = new();
// 入站消息队列
private readonly Channel<WebSocketMessage> _incomingMessages;
// 出站消息队列
private readonly Channel<WebSocketMessage> _outgoingMessages;
private readonly ILogger<WebSocketConnectionManager> _logger;
// 心跳检测定时器
private readonly Timer _heartbeatTimer;
private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(30);
private readonly WebSocketOptions _options;
private bool _disposed;
// 消息到达事件
public event Func<WebSocketMessage, Task>? OnMessageReceived;
public WebSocketConnectionManager(ILogger<WebSocketConnectionManager> logger)
public WebSocketConnectionManager(
ILogger<WebSocketConnectionManager> logger,
IOptions<WebSocketOptions> options)
{
_logger = logger;
_options = options.Value;
_logger.LogInformation("初始化 WebSocket 连接管理器");
// 创建有界通道,限制最大消息数量为 10000
_incomingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000)
_incomingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(_options.MessageQueueSize)
{
FullMode = BoundedChannelFullMode.Wait, // 当队列满时等待
SingleReader = false, // 允许多个读取者
SingleWriter = false // 允许多个写入者
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
_outgoingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000)
_outgoingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(_options.MessageQueueSize)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
@ -46,65 +49,131 @@ public class WebSocketConnectionManager : IDisposable
});
_logger.LogInformation("创建消息队列完成,入站队列大小:{IncomingSize},出站队列大小:{OutgoingSize}",
10000, 10000);
_options.MessageQueueSize, _options.MessageQueueSize);
// 启动心跳检测定时器
//_heartbeatTimer = new Timer(CheckConnections, null, _heartbeatInterval, _heartbeatInterval);
_logger.LogInformation("心跳检测定时器已启动,间隔:{Interval}秒", _heartbeatInterval.TotalSeconds);
_heartbeatTimer = new Timer(CheckConnections, null, _options.HeartbeatInterval, _options.HeartbeatInterval);
_logger.LogInformation("心跳检测定时器已启动,间隔:{Interval}秒", _options.HeartbeatInterval.TotalSeconds);
}
/// <summary>
/// 添加新的 WebSocket 连接
/// </summary>
public string AddConnection(System.Net.WebSockets.WebSocket socket)
public async Task<string> AddConnectionAsync(System.Net.WebSockets.WebSocket socket)
{
var connectionId = Guid.NewGuid().ToString();
if (_disposed)
{
throw new ObjectDisposedException(nameof(WebSocketConnectionManager));
}
if (socket == null)
{
throw new ArgumentNullException(nameof(socket));
}
if (_connections.Count >= _options.MaxConcurrentConnections)
{
_logger.LogWarning("达到最大连接数限制:{MaxConnections}", _options.MaxConcurrentConnections);
throw new ConnectionLimitExceededException($"已达到最大连接数限制:{_options.MaxConcurrentConnections}");
}
var connectionId = await GenerateUniqueConnectionIdAsync();
var connection = new WebSocketConnection
{
Socket = socket,
LastActivityTime = DateTime.UtcNow,
Status = ConnectionStatus.Connected
};
_connections.TryAdd(connectionId, connection);
if (!_connections.TryAdd(connectionId, connection))
{
_logger.LogError("添加连接失败,连接ID:{ConnectionId}", connectionId);
throw new ConnectionAddFailedException($"添加连接失败,连接ID:{connectionId}");
}
_logger.LogInformation("添加新连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}",
connectionId, _connections.Count);
return connectionId;
}
/// <summary>
/// 移除 WebSocket 连接
/// </summary>
public bool RemoveConnection(string connectionId)
private async Task<string> GenerateUniqueConnectionIdAsync()
{
string connectionId;
do
{
connectionId = Guid.NewGuid().ToString();
} while (_connections.ContainsKey(connectionId));
await Task.CompletedTask.ConfigureAwait(false);
return connectionId;
}
public async Task<bool> RemoveConnectionAsync(string connectionId)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(WebSocketConnectionManager));
}
if (string.IsNullOrEmpty(connectionId))
{
throw new ArgumentException("连接ID不能为空", nameof(connectionId));
}
if (_connections.TryRemove(connectionId, out var connection))
{
try
{
if (connection.Socket.State == WebSocketState.Open)
{
await connection.Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Connection removed by manager",
CancellationToken.None);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "关闭连接时发生错误,连接ID:{ConnectionId}", connectionId);
}
_logger.LogInformation("移除连接成功,连接ID:{ConnectionId},当前连接数:{ConnectionCount}",
connectionId, _connections.Count);
return true;
}
_logger.LogWarning("移除连接失败,连接ID:{ConnectionId} 不存在", connectionId);
return false;
}
/// <summary>
/// 获取指定连接
/// </summary>
public WebSocketConnection? GetConnection(string connectionId)
{
_connections.TryGetValue(connectionId, out var connection);
if (connection == null)
if (_disposed)
{
throw new ObjectDisposedException(nameof(WebSocketConnectionManager));
}
if (string.IsNullOrEmpty(connectionId))
{
_logger.LogWarning("获取连接失败,连接ID:{ConnectionId} 不存在", connectionId);
throw new ArgumentException("连接ID不能为空", nameof(connectionId));
}
return connection;
if (_connections.TryGetValue(connectionId, out var connection))
{
return connection;
}
_logger.LogWarning("获取连接失败,连接ID:{ConnectionId} 不存在", connectionId);
return null;
}
/// <summary>
/// 更新连接活动时间
/// </summary>
public void UpdateConnectionActivity(string connectionId)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(WebSocketConnectionManager));
}
if (string.IsNullOrEmpty(connectionId))
{
throw new ArgumentException("连接ID不能为空", nameof(connectionId));
}
if (_connections.TryGetValue(connectionId, out var connection))
{
connection.LastActivityTime = DateTime.UtcNow;
@ -112,9 +181,6 @@ public class WebSocketConnectionManager : IDisposable
}
}
/// <summary>
/// 入队入站消息
/// </summary>
public async ValueTask QueueIncomingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队入站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
@ -123,7 +189,6 @@ public class WebSocketConnectionManager : IDisposable
await _incomingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId);
// 触发消息到达事件
if (OnMessageReceived != null)
{
_logger.LogDebug("触发消息到达事件,连接ID:{ConnectionId}", message.ConnectionId);
@ -131,9 +196,6 @@ public class WebSocketConnectionManager : IDisposable
}
}
/// <summary>
/// 入队出站消息
/// </summary>
public async ValueTask QueueOutgoingMessage(WebSocketMessage message)
{
_logger.LogDebug("入队出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
@ -143,123 +205,60 @@ public class WebSocketConnectionManager : IDisposable
UpdateConnectionActivity(message.ConnectionId);
}
/// <summary>
/// 读取入站消息
/// </summary>
public IAsyncEnumerable<WebSocketMessage> ReadIncomingMessagesAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("开始读取入站消息");
return _incomingMessages.Reader.ReadAllAsync(cancellationToken);
}
/// <summary>
/// 读取出站消息
/// </summary>
public IAsyncEnumerable<WebSocketMessage> ReadOutgoingMessagesAsync(CancellationToken cancellationToken)
{
_logger.LogDebug("开始读取出站消息");
return _outgoingMessages.Reader.ReadAllAsync(cancellationToken);
}
/// <summary>
/// 获取所有连接
/// </summary>
public IEnumerable<WebSocketConnection> GetAllConnections()
{
_logger.LogDebug("获取所有连接,当前连接数:{ConnectionCount}", _connections.Count);
return _connections.Values;
}
/// <summary>
/// 检查连接状态
/// </summary>
private void CheckConnections(object? state)
private async void CheckConnections(object? state)
{
if (_disposed) return;
var now = DateTime.UtcNow;
var inactiveThreshold = TimeSpan.FromMinutes(1);
int inactiveCount = 0;
var inactiveCount = 0;
foreach (var (connectionId, connection) in _connections)
foreach (var kvp in _connections)
{
if (connection.Status == ConnectionStatus.Connected &&
now - connection.LastActivityTime > inactiveThreshold)
var connection = kvp.Value;
if (now - connection.LastActivityTime > _options.ConnectionTimeout)
{
_logger.LogWarning("检测到不活跃连接,连接ID:{ConnectionId},最后活动时间:{LastActivityTime}",
connectionId, connection.LastActivityTime);
inactiveCount++;
_ = CloseConnectionAsync(connectionId);
try
{
await RemoveConnectionAsync(kvp.Key);
inactiveCount++;
}
catch (Exception ex)
{
_logger.LogError(ex, "移除不活跃连接时发生错误,连接ID:{ConnectionId}", kvp.Key);
}
}
}
if (inactiveCount > 0)
{
_logger.LogInformation("心跳检测完成,发现 {InactiveCount} 个不活跃连接", inactiveCount);
}
}
/// <summary>
/// 关闭连接
/// </summary>
private async Task CloseConnectionAsync(string connectionId)
{
if (_connections.TryGetValue(connectionId, out var connection))
{
try
{
_logger.LogInformation("正在关闭连接,连接ID:{ConnectionId}", connectionId);
await connection.Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Connection timeout",
CancellationToken.None);
_logger.LogInformation("连接已关闭,连接ID:{ConnectionId}", connectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "关闭连接时发生错误,连接ID:{ConnectionId}", connectionId);
}
finally
{
RemoveConnection(connectionId);
}
_logger.LogInformation("清理了 {InactiveCount} 个不活跃连接", inactiveCount);
}
}
public void Dispose()
{
_logger.LogInformation("正在释放 WebSocket 连接管理器资源");
_heartbeatTimer?.Dispose();
_logger.LogInformation("WebSocket 连接管理器资源已释放");
}
}
/// <summary>
/// WebSocket 连接信息
/// </summary>
public class WebSocketConnection
{
public System.Net.WebSockets.WebSocket Socket { get; set; } = null!;
public DateTime LastActivityTime { get; set; }
public ConnectionStatus Status { get; set; }
}
/// <summary>
/// 连接状态枚举
/// </summary>
public enum ConnectionStatus
{
Connected, // 已连接
Disconnected, // 已断开
Error // 错误状态
}
if (_disposed) return;
/// <summary>
/// WebSocket 消息记录
/// </summary>
public record WebSocketMessage
{
public string ConnectionId { get; init; } = string.Empty; // 连接ID
public byte[] Data { get; init; } = Array.Empty<byte>(); // 消息数据
public WebSocketMessageType MessageType { get; init; } // 消息类型
public DateTime Timestamp { get; init; } = DateTime.UtcNow; // 时间戳
public int Priority { get; init; } = 0; // 优先级
_disposed = true;
_heartbeatTimer.Dispose();
_logger.LogInformation("WebSocket 连接管理器已释放");
}
}

17
src/CellularManagement.WebSocket/Exceptions/WebSocketExceptions.cs

@ -0,0 +1,17 @@
namespace CellularManagement.WebSocket.Exceptions;
/// <summary>
/// 连接数超出限制异常
/// </summary>
public class ConnectionLimitExceededException : Exception
{
public ConnectionLimitExceededException(string message) : base(message) { }
}
/// <summary>
/// 添加连接失败异常
/// </summary>
public class ConnectionAddFailedException : Exception
{
public ConnectionAddFailedException(string message) : base(message) { }
}

12
src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs

@ -9,7 +9,7 @@ namespace CellularManagement.WebSocket.Extensions;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddWebSocketServices(this IServiceCollection services, Action<WebSocketOptions>? configureOptions = null)
public static IServiceCollection AddWebSocketServices(this IServiceCollection services, Action<CellularManagement.WebSocket.Models.WebSocketOptions>? configureOptions = null)
{
services.AddSingleton<WebSocketConnectionManager>();
// 注册 WebSocket 消息处理器为 Singleton
@ -28,12 +28,4 @@ public static class ServiceCollectionExtensions
{
return app.UseMiddleware<WebSocketMiddleware>();
}
}
public class WebSocketOptions
{
public int MaxConcurrentConnections { get; set; } = 1000;
public int MaxMessageSize { get; set; } = 1024 * 4;
public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(30);
}
}

5
src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs

@ -1,5 +1,6 @@
using System.Net.WebSockets;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
@ -37,7 +38,7 @@ public class WebSocketMiddleware
_logger.LogInformation("收到 WebSocket 连接请求,路径:{Path}", context.Request.Path);
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = _connectionManager.AddConnection(webSocket);
var connectionId = await _connectionManager.AddConnectionAsync(webSocket);
_logger.LogInformation("WebSocket 连接已接受,连接ID:{ConnectionId}", connectionId);
try
@ -58,7 +59,7 @@ public class WebSocketMiddleware
}
finally
{
_connectionManager.RemoveConnection(connectionId);
await _connectionManager.RemoveConnectionAsync(connectionId);
_logger.LogInformation("WebSocket 连接已关闭,连接ID:{ConnectionId}", connectionId);
}
}

15
src/CellularManagement.WebSocket/Models/WebSocketMessage.cs

@ -0,0 +1,15 @@
using System.Net.WebSockets;
namespace CellularManagement.WebSocket.Models;
/// <summary>
/// WebSocket 消息记录
/// </summary>
public record WebSocketMessage
{
public string ConnectionId { get; init; } = string.Empty; // 连接ID
public byte[] Data { get; init; } = Array.Empty<byte>(); // 消息数据
public WebSocketMessageType MessageType { get; init; } // 消息类型
public DateTime Timestamp { get; init; } = DateTime.UtcNow; // 时间戳
public int Priority { get; init; } = 0; // 优先级
}

17
src/CellularManagement.WebSocket/Models/WebSocketOptions.cs

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CellularManagement.WebSocket.Models
{
public class WebSocketOptions
{
public int MaxConcurrentConnections { get; set; } = 1000;
public int MaxMessageSize { get; set; } = 1024 * 4;
public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(30);
public int MessageQueueSize { get; set; } = 10000;
}
}

1
src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs

@ -1,5 +1,6 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline.Steps;

1
src/CellularManagement.WebSocket/Pipeline/Steps/MessageValidationStep.cs

@ -1,5 +1,6 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline.Steps;

1
src/CellularManagement.WebSocket/Services/Handlers/ChatMessageHandler.cs

@ -1,5 +1,6 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services.Handlers;

1
src/CellularManagement.WebSocket/Services/Handlers/NotificationMessageHandler.cs

@ -1,5 +1,6 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services.Handlers;

1
src/CellularManagement.WebSocket/Services/IWebSocketMessageHandler.cs

@ -1,4 +1,5 @@
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Services;

1
src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

@ -1,5 +1,6 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using CellularManagement.WebSocket.Pipeline;
using CellularManagement.WebSocket.Pipeline.Steps;
using Microsoft.Extensions.Hosting;

Loading…
Cancel
Save