From aec7c755bc87e3501a5acc119e0a072ff1705559 Mon Sep 17 00:00:00 2001 From: hyh Date: Wed, 30 Apr 2025 11:03:55 +0800 Subject: [PATCH] Initial commit on ws branch --- .../Pipeline/IWebSocketMessageHandler.cs | 28 -- .../Services/IWebSocketService.cs | 16 - .../Services/WebSocketService.cs | 137 ------ .../Common/IAggregateRoot.cs | 6 + .../Entities/WebSocketConnection.cs | 66 --- .../Events/WebSocketMessageEvent.cs | 35 -- .../Configurations/WebSocketConfiguration.cs | 9 - .../DependencyInjection.cs | 9 - .../DistributedWebSocketManager.cs | 196 -------- .../Monitoring/WebSocketMetrics.cs | 65 --- .../WebSocketMessageValidationHandler.cs | 54 --- .../Pipeline/WebSocketMessagePipeline.cs | 145 ------ .../Pooling/WebSocketMessagePool.cs | 59 --- .../WebSocket/WebSocketConnectionManager.cs | 129 ------ .../WebSocket/WebSocketMessagePipeline.cs | 199 --------- .../WebSocket/WebSocketMessagePool.cs | 72 --- .../WebSocket/WebSocketMiddleware.cs | 134 ------ .../WebSocket/WebSocketService.cs | 420 ------------------ .../CellularManagement.WebAPI.http | 13 - src/CellularManagement.WebAPI/Program.cs | 26 +- .../appsettings.README.md | 5 - .../appsettings.json | 4 - .../wwwroot/websocket.html | 26 +- 23 files changed, 33 insertions(+), 1820 deletions(-) delete mode 100644 src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs delete mode 100644 src/CellularManagement.Application/Services/IWebSocketService.cs delete mode 100644 src/CellularManagement.Application/Services/WebSocketService.cs create mode 100644 src/CellularManagement.Domain/Common/IAggregateRoot.cs delete mode 100644 src/CellularManagement.Domain/Entities/WebSocketConnection.cs delete mode 100644 src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs delete mode 100644 src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs delete mode 100644 src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs delete mode 100644 src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs delete mode 100644 src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs delete mode 100644 src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs delete mode 100644 src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs delete mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs delete mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs delete mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs delete mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs delete mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs diff --git a/src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs b/src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs deleted file mode 100644 index a369832..0000000 --- a/src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System.Threading.Channels; - -namespace CellularManagement.Application.Pipeline; - -public interface IWebSocketMessageHandler -{ - string HandlerName { get; } - int Priority { get; } - bool CanHandle(string messageType); - Task HandleAsync(WebSocketMessageContext context, CancellationToken cancellationToken); -} - -public class WebSocketMessageContext -{ - public string ConnectionId { get; set; } = string.Empty; - public string MessageType { get; set; } = string.Empty; - public byte[] Payload { get; set; } = Array.Empty(); - public Dictionary Properties { get; } = new(); - public bool IsHandled { get; set; } - public Exception? Error { get; set; } -} - -public interface IWebSocketMessagePipeline -{ - Task ProcessMessageAsync(WebSocketMessageContext context, CancellationToken cancellationToken); - void RegisterHandler(IWebSocketMessageHandler handler); - void UnregisterHandler(string handlerName); -} \ No newline at end of file diff --git a/src/CellularManagement.Application/Services/IWebSocketService.cs b/src/CellularManagement.Application/Services/IWebSocketService.cs deleted file mode 100644 index 7c2313e..0000000 --- a/src/CellularManagement.Application/Services/IWebSocketService.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Net.WebSockets; -using CellularManagement.Domain.Entities; - -namespace CellularManagement.Application.Services; - -public interface IWebSocketService -{ - Task AcceptConnectionAsync(WebSocket webSocket); - Task CloseConnectionAsync(string connectionId); - Task SendMessageAsync(string connectionId, byte[] message); - Task BroadcastMessageAsync(byte[] message); - Task SendMessageToUserAsync(string userId, byte[] message); - Task AssociateUserAsync(string connectionId, string userId); - Task GetConnectionAsync(string connectionId); - Task> GetUserConnectionsAsync(string userId); -} \ No newline at end of file diff --git a/src/CellularManagement.Application/Services/WebSocketService.cs b/src/CellularManagement.Application/Services/WebSocketService.cs deleted file mode 100644 index bb6cb64..0000000 --- a/src/CellularManagement.Application/Services/WebSocketService.cs +++ /dev/null @@ -1,137 +0,0 @@ -using System.Net.WebSockets; -using CellularManagement.Domain.Entities; -using CellularManagement.Application.Services; -using CellularManagement.Application.Pipeline; -using Microsoft.Extensions.Logging; -using MediatR; -using CellularManagement.Domain.Events; - -namespace CellularManagement.Application.Services; - -public class WebSocketService : IWebSocketService -{ - private readonly ICacheService _cacheService; - private readonly ILogger _logger; - private readonly IWebSocketMessagePipeline _messagePipeline; - private readonly IMediator _mediator; - private const string CONNECTION_PREFIX = "ws_connection_"; - private const string USER_CONNECTION_PREFIX = "ws_user_"; - - public WebSocketService( - ICacheService cacheService, - ILogger logger, - IWebSocketMessagePipeline messagePipeline, - IMediator mediator) - { - _cacheService = cacheService; - _logger = logger; - _messagePipeline = messagePipeline; - _mediator = mediator; - } - - public async Task AcceptConnectionAsync(WebSocket webSocket) - { - var connectionId = Guid.NewGuid().ToString(); - var connection = WebSocketConnection.Create(connectionId); - - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - _cacheService.Set(connectionKey, webSocket); - _cacheService.Set(connectionId, connection); - - _logger.LogInformation("WebSocket connection accepted: {ConnectionId}", connectionId); - return connectionId; - } - - public async Task CloseConnectionAsync(string connectionId) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var userKey = $"{USER_CONNECTION_PREFIX}{connectionId}"; - - var connection = _cacheService.Get(connectionId); - if (connection != null) - { - connection.Close(); - _cacheService.Set(connectionId, connection); - } - - _cacheService.Remove(connectionKey); - _cacheService.Remove(userKey); - - _logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId); - return true; - } - - public async Task SendMessageAsync(string connectionId, byte[] message) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var webSocket = _cacheService.Get(connectionKey); - - if (webSocket?.State == WebSocketState.Open) - { - await webSocket.SendAsync( - new ArraySegment(message), - WebSocketMessageType.Text, - true, - CancellationToken.None); - - await _mediator.Publish(new WebSocketMessageSentEvent( - connectionId, - "text", - message)); - - return true; - } - return false; - } - - public async Task BroadcastMessageAsync(byte[] message) - { - // 注意:这里需要实现广播逻辑 - // 由于ICacheService的限制,可能需要使用其他方式实现 - return false; - } - - public async Task SendMessageToUserAsync(string userId, byte[] message) - { - // 注意:这里需要实现向特定用户发送消息的逻辑 - // 由于ICacheService的限制,可能需要使用其他方式实现 - return false; - } - - public async Task AssociateUserAsync(string connectionId, string userId) - { - var connection = _cacheService.Get(connectionId); - if (connection != null) - { - connection.AssociateUser(userId); - _cacheService.Set(connectionId, connection); - - var userKey = $"{USER_CONNECTION_PREFIX}{connectionId}"; - _cacheService.Set(userKey, userId); - } - } - - public async Task GetConnectionAsync(string connectionId) - { - return _cacheService.Get(connectionId); - } - - public async Task> GetUserConnectionsAsync(string userId) - { - // 注意:这里需要实现获取用户所有连接的逻辑 - // 由于ICacheService的限制,可能需要使用其他方式实现 - return Enumerable.Empty(); - } - - public async Task ProcessMessageAsync(string connectionId, string messageType, byte[] payload) - { - var context = new WebSocketMessageContext - { - ConnectionId = connectionId, - MessageType = messageType, - Payload = payload - }; - - await _messagePipeline.ProcessMessageAsync(context, CancellationToken.None); - } -} \ No newline at end of file diff --git a/src/CellularManagement.Domain/Common/IAggregateRoot.cs b/src/CellularManagement.Domain/Common/IAggregateRoot.cs new file mode 100644 index 0000000..e1bfed3 --- /dev/null +++ b/src/CellularManagement.Domain/Common/IAggregateRoot.cs @@ -0,0 +1,6 @@ +namespace CellularManagement.Domain.Common; + +public interface IAggregateRoot +{ + // 聚合根接口,用于标识领域模型中的聚合根 +} \ No newline at end of file diff --git a/src/CellularManagement.Domain/Entities/WebSocketConnection.cs b/src/CellularManagement.Domain/Entities/WebSocketConnection.cs deleted file mode 100644 index 8cbc36d..0000000 --- a/src/CellularManagement.Domain/Entities/WebSocketConnection.cs +++ /dev/null @@ -1,66 +0,0 @@ -using System.Net.WebSockets; -using System.Text.Json.Serialization; - -namespace CellularManagement.Domain.Entities; - -public class WebSocketConnection -{ - [JsonPropertyName("connectionId")] - public string ConnectionId { get; set; } - - [JsonPropertyName("userId")] - public string? UserId { get; set; } - - [JsonPropertyName("connectedAt")] - public DateTime ConnectedAt { get; set; } - - [JsonPropertyName("lastActivityAt")] - public DateTime? LastActivityAt { get; set; } - - [JsonPropertyName("state")] - public WebSocketState State { get; set; } - - // 添加无参构造函数用于反序列化 - public WebSocketConnection() - { - ConnectionId = string.Empty; - ConnectedAt = DateTime.UtcNow; - State = WebSocketState.Open; - } - - // 添加带参数的构造函数 - [JsonConstructor] - public WebSocketConnection(string connectionId, string? userId, DateTime connectedAt, DateTime? lastActivityAt, WebSocketState state) - { - ConnectionId = connectionId; - UserId = userId; - ConnectedAt = connectedAt; - LastActivityAt = lastActivityAt; - State = state; - } - - public static WebSocketConnection Create(string connectionId) - { - return new WebSocketConnection - { - ConnectionId = connectionId, - ConnectedAt = DateTime.UtcNow, - State = WebSocketState.Open - }; - } - - public void AssociateUser(string userId) - { - UserId = userId; - } - - public void UpdateLastActivity() - { - LastActivityAt = DateTime.UtcNow; - } - - public void Close() - { - State = WebSocketState.Closed; - } -} \ No newline at end of file diff --git a/src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs b/src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs deleted file mode 100644 index 6ac1f74..0000000 --- a/src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs +++ /dev/null @@ -1,35 +0,0 @@ -using MediatR; - -namespace CellularManagement.Domain.Events; - -public class WebSocketMessageReceivedEvent : INotification -{ - public string ConnectionId { get; } - public string MessageType { get; } - public byte[] Payload { get; } - public DateTime Timestamp { get; } - - public WebSocketMessageReceivedEvent(string connectionId, string messageType, byte[] payload) - { - ConnectionId = connectionId; - MessageType = messageType; - Payload = payload; - Timestamp = DateTime.UtcNow; - } -} - -public class WebSocketMessageSentEvent : INotification -{ - public string ConnectionId { get; } - public string MessageType { get; } - public byte[] Payload { get; } - public DateTime Timestamp { get; } - - public WebSocketMessageSentEvent(string connectionId, string messageType, byte[] payload) - { - ConnectionId = connectionId; - MessageType = messageType; - Payload = payload; - Timestamp = DateTime.UtcNow; - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs b/src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs deleted file mode 100644 index bbe6fb4..0000000 --- a/src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace CellularManagement.Infrastructure.Configurations; - -public class WebSocketConfiguration -{ - public const string SectionName = "WebSocket"; - - public int Port { get; set; } = 5202; - public string Path { get; set; } = "/ws"; -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/DependencyInjection.cs b/src/CellularManagement.Infrastructure/DependencyInjection.cs index ebfc540..d79381f 100644 --- a/src/CellularManagement.Infrastructure/DependencyInjection.cs +++ b/src/CellularManagement.Infrastructure/DependencyInjection.cs @@ -8,8 +8,6 @@ using CellularManagement.Domain.Entities; using CellularManagement.Domain.Repositories; using CellularManagement.Application.Services; using CellularManagement.Infrastructure.Services; -using CellularManagement.Infrastructure.WebSocket; -using CellularManagement.Infrastructure.Monitoring; using Scrutor; namespace CellularManagement.Infrastructure; @@ -119,13 +117,6 @@ public static class DependencyInjection // 注册缓存服务 services.AddScoped(); - // 注册WebSocket相关服务 - services.AddSingleton(); - services.AddSingleton(); - services.AddScoped(); - services.AddSingleton(); - services.AddSingleton(); - // 自动注册服务 services .Scan(action => diff --git a/src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs b/src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs deleted file mode 100644 index eac51af..0000000 --- a/src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs +++ /dev/null @@ -1,196 +0,0 @@ -using System.Net.WebSockets; -using CellularManagement.Domain.Entities; -using CellularManagement.Application.Services; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Caching.Distributed; -using System.Text.Json; - -namespace CellularManagement.Infrastructure.Distributed; - -public class DistributedWebSocketManager : IWebSocketService -{ - private readonly IDistributedCache _distributedCache; - private readonly ILogger _logger; - private readonly string _nodeId; - private const string CONNECTION_PREFIX = "ws_connection_"; - private const string USER_PREFIX = "ws_user_"; - private const string NODE_PREFIX = "ws_node_"; - - public DistributedWebSocketManager( - IDistributedCache distributedCache, - ILogger logger) - { - _distributedCache = distributedCache; - _logger = logger; - _nodeId = Guid.NewGuid().ToString(); - } - - public async Task AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket) - { - var connectionId = Guid.NewGuid().ToString(); - var connection = WebSocketConnection.Create(connectionId); - - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - - // 存储连接信息 - await _distributedCache.SetStringAsync( - connectionKey, - JsonSerializer.Serialize(connection), - new DistributedCacheEntryOptions - { - SlidingExpiration = TimeSpan.FromMinutes(30) - }); - - // 更新节点连接列表 - await AddConnectionToNodeAsync(connectionId); - - _logger.LogInformation("WebSocket connection accepted: {ConnectionId} on node {NodeId}", - connectionId, _nodeId); - - return connectionId; - } - - public async Task CloseConnectionAsync(string connectionId) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var userKey = $"{USER_PREFIX}{connectionId}"; - - // 获取连接信息 - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - if (connectionJson != null) - { - var connection = JsonSerializer.Deserialize(connectionJson); - if (connection != null) - { - connection.Close(); - await _distributedCache.SetStringAsync( - connectionKey, - JsonSerializer.Serialize(connection)); - } - } - - // 从节点连接列表中移除 - await RemoveConnectionFromNodeAsync(connectionId); - - // 清理缓存 - await _distributedCache.RemoveAsync(connectionKey); - await _distributedCache.RemoveAsync(userKey); - - _logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId); - return true; - } - - public async Task SendMessageAsync(string connectionId, byte[] message) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - - if (connectionJson != null) - { - var connection = JsonSerializer.Deserialize(connectionJson); - if (connection?.State == WebSocketState.Open) - { - // 这里需要实现消息发送逻辑 - // 可能需要使用消息队列或其他机制 - return true; - } - } - return false; - } - - public async Task BroadcastMessageAsync(byte[] message) - { - // 获取所有节点的连接列表 - var nodes = await GetAllNodesAsync(); - foreach (var node in nodes) - { - // 向每个节点发送广播消息 - // 这里需要实现节点间的消息传递机制 - } - return true; - } - - public async Task SendMessageToUserAsync(string userId, byte[] message) - { - // 获取用户的所有连接 - var connections = await GetUserConnectionsAsync(userId); - foreach (var connection in connections) - { - await SendMessageAsync(connection.ConnectionId, message); - } - return true; - } - - public async Task AssociateUserAsync(string connectionId, string userId) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var userKey = $"{USER_PREFIX}{connectionId}"; - - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - if (connectionJson != null) - { - var connection = JsonSerializer.Deserialize(connectionJson); - if (connection != null) - { - connection.AssociateUser(userId); - await _distributedCache.SetStringAsync( - connectionKey, - JsonSerializer.Serialize(connection)); - await _distributedCache.SetStringAsync(userKey, userId); - } - } - } - - public async Task GetConnectionAsync(string connectionId) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - return connectionJson != null - ? JsonSerializer.Deserialize(connectionJson) - : null; - } - - public async Task> GetUserConnectionsAsync(string userId) - { - var connections = new List(); - // 这里需要实现获取用户所有连接的逻辑 - return connections; - } - - private async Task AddConnectionToNodeAsync(string connectionId) - { - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - var connections = await GetNodeConnectionsAsync(); - connections.Add(connectionId); - await _distributedCache.SetStringAsync( - nodeKey, - JsonSerializer.Serialize(connections)); - } - - private async Task RemoveConnectionFromNodeAsync(string connectionId) - { - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - var connections = await GetNodeConnectionsAsync(); - connections.Remove(connectionId); - await _distributedCache.SetStringAsync( - nodeKey, - JsonSerializer.Serialize(connections)); - } - - private async Task> GetNodeConnectionsAsync() - { - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); - return connectionsJson != null - ? JsonSerializer.Deserialize>(connectionsJson) ?? new List() - : new List(); - } - - private async Task> GetAllNodesAsync() - { - // 这里需要实现获取所有节点的逻辑 - // 可能需要使用服务发现或其他机制 - return new List(); - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs b/src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs deleted file mode 100644 index 79b9dca..0000000 --- a/src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs +++ /dev/null @@ -1,65 +0,0 @@ -using System.Diagnostics; -using System.Diagnostics.Metrics; -using Microsoft.Extensions.Logging; - -namespace CellularManagement.Infrastructure.Monitoring; - -public class WebSocketMetrics -{ - private readonly ILogger _logger; - private readonly Meter _meter; - private readonly Counter _connectionsCounter; - private readonly Counter _messagesCounter; - private readonly Histogram _messageLatency; - private readonly Histogram _messageSize; - private readonly Counter _errorsCounter; - - public WebSocketMetrics(ILogger logger) - { - _logger = logger; - _meter = new Meter("CellularManagement.WebSocket", "1.0.0"); - - _connectionsCounter = _meter.CreateCounter("websocket.connections", "个", "活跃连接数"); - _messagesCounter = _meter.CreateCounter("websocket.messages", "个", "消息处理数"); - _messageLatency = _meter.CreateHistogram("websocket.message.latency", "ms", "消息处理延迟"); - _messageSize = _meter.CreateHistogram("websocket.message.size", "bytes", "消息大小"); - _errorsCounter = _meter.CreateCounter("websocket.errors", "个", "错误数"); - } - - public void ConnectionEstablished() - { - _connectionsCounter.Add(1); - _logger.LogInformation("WebSocket connection established"); - } - - public void ConnectionClosed() - { - _connectionsCounter.Add(-1); - _logger.LogInformation("WebSocket connection closed"); - } - - public void MessageReceived(int size) - { - _messagesCounter.Add(1); - _messageSize.Record(size); - _logger.LogDebug("WebSocket message received, size: {Size} bytes", size); - } - - public void MessageProcessed(TimeSpan latency) - { - _messageLatency.Record(latency.TotalMilliseconds); - _logger.LogDebug("WebSocket message processed, latency: {Latency}ms", latency.TotalMilliseconds); - } - - public void ErrorOccurred(string errorType) - { - _errorsCounter.Add(1); - _logger.LogError("WebSocket error occurred: {ErrorType}", errorType); - } - - public void RecordGauge(string name, double value) - { - var gauge = _meter.CreateObservableGauge($"websocket.{name}", () => value); - _logger.LogDebug("WebSocket gauge recorded: {Name} = {Value}", name, value); - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs b/src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs deleted file mode 100644 index 8e7c76e..0000000 --- a/src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs +++ /dev/null @@ -1,54 +0,0 @@ -using CellularManagement.Application.Pipeline; -using Microsoft.Extensions.Logging; - -namespace CellularManagement.Infrastructure.Pipeline.Handlers; - -public class WebSocketMessageValidationHandler : IWebSocketMessageHandler -{ - private readonly ILogger _logger; - - public string HandlerName => "MessageValidationHandler"; - public int Priority => 100; - - public WebSocketMessageValidationHandler(ILogger logger) - { - _logger = logger; - } - - public bool CanHandle(string messageType) - { - return true; // 处理所有消息类型 - } - - public async Task HandleAsync(WebSocketMessageContext context, CancellationToken cancellationToken) - { - try - { - // 验证消息格式 - if (string.IsNullOrEmpty(context.MessageType)) - { - throw new ArgumentException("Message type cannot be empty"); - } - - if (context.Payload == null || context.Payload.Length == 0) - { - throw new ArgumentException("Message payload cannot be empty"); - } - - // 验证消息大小 - if (context.Payload.Length > 1024 * 1024) // 1MB - { - throw new ArgumentException("Message payload too large"); - } - - _logger.LogInformation("Message validation passed for connection: {ConnectionId}", context.ConnectionId); - } - catch (Exception ex) - { - _logger.LogError(ex, "Message validation failed for connection: {ConnectionId}", context.ConnectionId); - context.Error = ex; - context.IsHandled = true; - throw; - } - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs b/src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs deleted file mode 100644 index e2bb614..0000000 --- a/src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs +++ /dev/null @@ -1,145 +0,0 @@ -using System.Threading.Channels; -using CellularManagement.Application.Pipeline; -using Microsoft.Extensions.Logging; -using MediatR; -using CellularManagement.Domain.Events; - -namespace CellularManagement.Infrastructure.Pipeline; - -public class WebSocketMessagePipeline : IWebSocketMessagePipeline -{ - private readonly Channel _messageChannel; - private readonly List _handlers = new(); - private readonly ILogger _logger; - private readonly IMediator _mediator; - private readonly int _batchSize; - private readonly int _maxQueueSize; - - public WebSocketMessagePipeline( - ILogger logger, - IMediator mediator, - int batchSize = 100, - int maxQueueSize = 10000) - { - _logger = logger; - _mediator = mediator; - _batchSize = batchSize; - _maxQueueSize = maxQueueSize; - - var options = new BoundedChannelOptions(maxQueueSize) - { - FullMode = BoundedChannelFullMode.Wait, - SingleWriter = false, - SingleReader = false - }; - - _messageChannel = Channel.CreateBounded(options); - } - - public async Task ProcessMessageAsync(WebSocketMessageContext context, CancellationToken cancellationToken) - { - try - { - await _messageChannel.Writer.WriteAsync(context, cancellationToken); - await _mediator.Publish(new WebSocketMessageReceivedEvent( - context.ConnectionId, - context.MessageType, - context.Payload), cancellationToken); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to process message for connection: {ConnectionId}", context.ConnectionId); - throw; - } - } - - public void RegisterHandler(IWebSocketMessageHandler handler) - { - _handlers.Add(handler); - _handlers.Sort((a, b) => b.Priority.CompareTo(a.Priority)); - } - - public void UnregisterHandler(string handlerName) - { - _handlers.RemoveAll(h => h.HandlerName == handlerName); - } - - public async Task StartProcessingAsync(CancellationToken cancellationToken) - { - var batch = new List(_batchSize); - - try - { - while (!cancellationToken.IsCancellationRequested) - { - batch.Clear(); - - while (batch.Count < _batchSize && - await _messageChannel.Reader.WaitToReadAsync(cancellationToken)) - { - if (_messageChannel.Reader.TryRead(out var message)) - { - batch.Add(message); - } - } - - if (batch.Count > 0) - { - await ProcessBatchAsync(batch, cancellationToken); - } - } - } - catch (OperationCanceledException) - { - _logger.LogInformation("Message processing pipeline stopped"); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error in message processing pipeline"); - } - } - - private async Task ProcessBatchAsync(List batch, CancellationToken cancellationToken) - { - foreach (var message in batch) - { - try - { - await ProcessMessageWithHandlersAsync(message, cancellationToken); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing message: {MessageId}", message.ConnectionId); - message.Error = ex; - } - } - } - - private async Task ProcessMessageWithHandlersAsync(WebSocketMessageContext context, CancellationToken cancellationToken) - { - foreach (var handler in _handlers) - { - if (handler.CanHandle(context.MessageType) && !context.IsHandled) - { - try - { - await handler.HandleAsync(context, cancellationToken); - if (context.IsHandled) - { - break; - } - } - catch (Exception ex) - { - _logger.LogError(ex, "Handler {HandlerName} failed to process message", handler.HandlerName); - throw; - } - } - } - - if (!context.IsHandled) - { - _logger.LogWarning("No handler found for message type: {MessageType}", context.MessageType); - } - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs b/src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs deleted file mode 100644 index 7c87ee3..0000000 --- a/src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs +++ /dev/null @@ -1,59 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; -using CellularManagement.Application.Pipeline; - -namespace CellularManagement.Infrastructure.Pooling; - -public class WebSocketMessagePool -{ - private readonly ConcurrentBag _pool = new(); - private readonly ArrayPool _arrayPool = ArrayPool.Shared; - private readonly int _maxPoolSize; - private readonly int _bufferSize; - - public WebSocketMessagePool(int maxPoolSize = 1000, int bufferSize = 1024 * 4) - { - _maxPoolSize = maxPoolSize; - _bufferSize = bufferSize; - } - - public WebSocketMessageContext Rent() - { - if (_pool.TryTake(out var context)) - { - return context; - } - - return new WebSocketMessageContext - { - Payload = _arrayPool.Rent(_bufferSize) - }; - } - - public void Return(WebSocketMessageContext context) - { - if (_pool.Count < _maxPoolSize) - { - context.ConnectionId = string.Empty; - context.MessageType = string.Empty; - context.Properties.Clear(); - context.IsHandled = false; - context.Error = null; - - if (context.Payload != null) - { - _arrayPool.Return(context.Payload); - context.Payload = _arrayPool.Rent(_bufferSize); - } - - _pool.Add(context); - } - else - { - if (context.Payload != null) - { - _arrayPool.Return(context.Payload); - } - } - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs deleted file mode 100644 index ccd129c..0000000 --- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs +++ /dev/null @@ -1,129 +0,0 @@ -using System.Net.WebSockets; -using System.Collections.Concurrent; -using Microsoft.Extensions.Logging; -using CellularManagement.Application.Services; - -namespace CellularManagement.Infrastructure.WebSocket; - -/// -/// WebSocket连接管理器 -/// 负责管理WebSocket连接的创建、删除和查询 -/// 使用线程安全的并发集合存储连接信息 -/// -public class WebSocketConnectionManager -{ - /// - /// 日志记录器,用于记录连接管理器的操作日志 - /// - private readonly ILogger _logger; - - /// - /// WebSocket连接字典 - /// 键为连接ID,值为WebSocket实例 - /// 使用线程安全的ConcurrentDictionary - /// - private readonly ConcurrentDictionary _sockets = new(); - - /// - /// 用户连接关联字典 - /// 键为连接ID,值为用户ID - /// 使用线程安全的ConcurrentDictionary - /// - private readonly ConcurrentDictionary _userConnections = new(); - - /// - /// 构造函数 - /// - /// 日志记录器,用于记录连接管理器的操作日志 - public WebSocketConnectionManager(ILogger logger) - { - _logger = logger; - } - - /// - /// 添加新的WebSocket连接 - /// 生成唯一的连接ID并存储连接信息 - /// - /// 要添加的WebSocket实例 - /// 新创建的连接ID - public string AddConnection(System.Net.WebSockets.WebSocket socket) - { - var connectionId = Guid.NewGuid().ToString(); - _sockets.TryAdd(connectionId, socket); - _logger.LogInformation("WebSocket连接已添加: {ConnectionId}", connectionId); - return connectionId; - } - - /// - /// 移除WebSocket连接 - /// 清理连接相关的所有信息 - /// - /// 要移除的连接ID - /// 是否成功移除连接 - public bool RemoveConnection(string connectionId) - { - _sockets.TryRemove(connectionId, out _); - _userConnections.TryRemove(connectionId, out _); - _logger.LogInformation("WebSocket连接已移除: {ConnectionId}", connectionId); - return true; - } - - /// - /// 获取指定ID的WebSocket连接 - /// - /// 要查询的连接ID - /// WebSocket实例,如果不存在则返回null - public System.Net.WebSockets.WebSocket? GetConnection(string connectionId) - { - _sockets.TryGetValue(connectionId, out var socket); - return socket; - } - - /// - /// 获取所有WebSocket连接 - /// - /// 所有WebSocket实例的集合 - public IEnumerable GetAllConnections() - { - return _sockets.Values; - } - - /// - /// 将连接与用户关联 - /// 建立连接ID和用户ID的映射关系 - /// - /// 要关联的连接ID - /// 要关联的用户ID - public void AssociateUser(string connectionId, string userId) - { - _userConnections.TryAdd(connectionId, userId); - } - - /// - /// 获取连接关联的用户ID - /// - /// 要查询的连接ID - /// 用户ID,如果未关联则返回null - public string? GetUserId(string connectionId) - { - _userConnections.TryGetValue(connectionId, out var userId); - return userId; - } - - /// - /// 获取用户的所有WebSocket连接 - /// 根据用户ID查找所有关联的连接 - /// - /// 要查询的用户ID - /// 用户的所有WebSocket实例的集合 - public IEnumerable GetUserConnections(string userId) - { - var connections = _userConnections - .Where(kvp => kvp.Value == userId) - .Select(kvp => kvp.Key) - .Select(connectionId => GetConnection(connectionId)) - .Where(socket => socket != null); - - return connections!; - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs deleted file mode 100644 index 0d5b8ee..0000000 --- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs +++ /dev/null @@ -1,199 +0,0 @@ -using System.Threading.Channels; -using System.Buffers; -using Microsoft.Extensions.Logging; - -namespace CellularManagement.Infrastructure.WebSocket; - -/// -/// WebSocket消息处理管道 -/// 负责批量处理和分发WebSocket消息 -/// 使用Channel实现消息队列,支持批量处理提高性能 -/// -public class WebSocketMessagePipeline -{ - /// - /// 消息通道,用于存储待处理的消息 - /// 使用有界通道限制内存使用 - /// - private readonly Channel _messageChannel; - - /// - /// 日志记录器,用于记录管道的操作日志 - /// - private readonly ILogger _logger; - - /// - /// 内存池,用于复用内存缓冲区 - /// - private readonly MemoryPool _memoryPool; - - /// - /// 批处理大小,每次处理的消息数量 - /// - private readonly int _batchSize; - - /// - /// 最大队列大小,限制通道中可存储的消息数量 - /// - private readonly int _maxQueueSize; - - /// - /// 构造函数 - /// - /// 日志记录器,用于记录管道的操作日志 - /// 批处理大小,每次处理的消息数量,默认100 - /// 最大队列大小,限制通道中可存储的消息数量,默认10000 - public WebSocketMessagePipeline( - ILogger logger, - int batchSize = 100, - int maxQueueSize = 10000) - { - _logger = logger; - _batchSize = batchSize; - _maxQueueSize = maxQueueSize; - _memoryPool = MemoryPool.Shared; - - // 创建有界通道 - var options = new BoundedChannelOptions(maxQueueSize) - { - FullMode = BoundedChannelFullMode.Wait, // 队列满时等待 - SingleWriter = false, // 允许多个写入者 - SingleReader = false // 允许多个读取者 - }; - - _messageChannel = Channel.CreateBounded(options); - } - - /// - /// 写入消息到管道 - /// 将消息添加到通道中等待处理 - /// - /// 要处理的WebSocket消息 - /// 是否成功写入消息 - public async ValueTask WriteMessageAsync(WebSocketMessage message) - { - try - { - await _messageChannel.Writer.WriteAsync(message); - return true; - } - catch (Exception ex) - { - _logger.LogError(ex, "写入消息到管道失败"); - return false; - } - } - - /// - /// 处理消息 - /// 循环从通道中读取消息并批量处理 - /// - /// 取消令牌,用于停止处理 - public async Task ProcessMessagesAsync(CancellationToken cancellationToken) - { - var batch = new List(_batchSize); - - try - { - while (!cancellationToken.IsCancellationRequested) - { - batch.Clear(); - - // 批量读取消息 - while (batch.Count < _batchSize && - await _messageChannel.Reader.WaitToReadAsync(cancellationToken)) - { - if (_messageChannel.Reader.TryRead(out var message)) - { - batch.Add(message); - } - } - - if (batch.Count > 0) - { - await ProcessBatchAsync(batch, cancellationToken); - } - } - } - catch (OperationCanceledException) - { - _logger.LogInformation("消息处理管道已停止"); - } - catch (Exception ex) - { - _logger.LogError(ex, "消息处理管道出错"); - } - } - - /// - /// 批量处理消息 - /// 处理一批消息,逐个处理并记录错误 - /// - /// 要处理的消息批次 - /// 取消令牌,用于停止处理 - private async Task ProcessBatchAsync(List batch, CancellationToken cancellationToken) - { - // 这里实现具体的消息处理逻辑 - foreach (var message in batch) - { - try - { - // 处理消息 - await ProcessMessageAsync(message, cancellationToken); - } - catch (Exception ex) - { - _logger.LogError(ex, "处理消息时出错: {MessageId}", message.MessageId); - } - } - } - - /// - /// 处理单个消息 - /// 实现具体的消息处理逻辑 - /// - /// 要处理的WebSocket消息 - /// 取消令牌,用于停止处理 - private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken) - { - // 实现具体的消息处理逻辑 - // 这里可以添加消息验证、转换、业务处理等步骤 - } -} - -/// -/// WebSocket消息实体 -/// 表示一个WebSocket消息的基本信息 -/// -public class WebSocketMessage -{ - /// - /// 消息ID - /// 唯一标识一条消息 - /// - public string MessageId { get; set; } = Guid.NewGuid().ToString(); - - /// - /// 连接ID - /// 标识消息来自哪个WebSocket连接 - /// - public string ConnectionId { get; set; } = string.Empty; - - /// - /// 消息类型 - /// 用于标识消息的类型,可用于路由和处理 - /// - public string MessageType { get; set; } = string.Empty; - - /// - /// 消息内容 - /// 消息的实际数据 - /// - public byte[] Payload { get; set; } = Array.Empty(); - - /// - /// 时间戳 - /// 消息创建的时间 - /// - public DateTime Timestamp { get; set; } = DateTime.UtcNow; -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs deleted file mode 100644 index 14ef73f..0000000 --- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System.Collections.Concurrent; -using CellularManagement.Application.Pipeline; - -namespace CellularManagement.Infrastructure.WebSocket; - -/// -/// WebSocket消息池 -/// 负责管理WebSocket消息上下文的复用,减少内存分配 -/// 使用对象池模式提高性能 -/// -public class WebSocketMessagePool -{ - /// - /// 消息上下文池 - /// 使用线程安全的ConcurrentBag存储可复用的消息上下文 - /// - private readonly ConcurrentBag _pool = new(); - - /// - /// 最大池大小 - /// 限制池中可存储的消息上下文数量 - /// - private readonly int _maxPoolSize; - - /// - /// 构造函数 - /// - /// 最大池大小,限制池中可存储的消息上下文数量,默认1000 - public WebSocketMessagePool(int maxPoolSize = 1000) - { - _maxPoolSize = maxPoolSize; - } - - /// - /// 从池中获取消息上下文 - /// 如果池中有可用的上下文则复用,否则创建新的 - /// - /// 可用的消息上下文 - public WebSocketMessageContext Rent() - { - // 尝试从池中获取可用的上下文 - if (_pool.TryTake(out var context)) - { - return context; - } - - // 如果池中没有可用的上下文,创建新的 - return new WebSocketMessageContext(); - } - - /// - /// 将消息上下文返回到池中 - /// 重置上下文状态并放回池中,如果池已满则丢弃 - /// - /// 要返回的消息上下文 - public void Return(WebSocketMessageContext context) - { - // 如果池未满,重置上下文并放回池中 - if (_pool.Count < _maxPoolSize) - { - // 重置上下文状态 - context.ConnectionId = string.Empty; - context.MessageType = string.Empty; - context.Payload = Array.Empty(); - context.Properties.Clear(); - context.IsHandled = false; - context.Error = null; - - _pool.Add(context); - } - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs deleted file mode 100644 index 98f5391..0000000 --- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs +++ /dev/null @@ -1,134 +0,0 @@ -using System.Net.WebSockets; -using System.Text; -using Microsoft.Extensions.Logging; -using Microsoft.AspNetCore.Http; -using Microsoft.Extensions.DependencyInjection; -using CellularManagement.Application.Services; - -namespace CellularManagement.Infrastructure.WebSocket; - -/// -/// WebSocket中间件 -/// 负责处理WebSocket请求的生命周期和消息处理 -/// 作为ASP.NET Core中间件,拦截WebSocket请求并管理连接 -/// -public class WebSocketMiddleware -{ - /// - /// 请求委托,用于继续处理管道中的下一个中间件 - /// - private readonly RequestDelegate _next; - - /// - /// 日志记录器,用于记录中间件的操作日志 - /// - private readonly ILogger _logger; - - /// - /// 服务提供者,用于创建服务作用域和获取所需服务 - /// - private readonly IServiceProvider _serviceProvider; - - /// - /// 构造函数 - /// - /// 请求委托,用于继续处理管道中的下一个中间件 - /// 日志记录器,用于记录中间件的操作日志 - /// 服务提供者,用于创建服务作用域和获取所需服务 - public WebSocketMiddleware( - RequestDelegate next, - ILogger logger, - IServiceProvider serviceProvider) - { - _next = next; - _logger = logger; - _serviceProvider = serviceProvider; - } - - /// - /// 处理HTTP请求 - /// 检查是否为WebSocket请求,如果是则处理WebSocket连接 - /// - /// HTTP上下文,包含请求和响应信息 - public async Task InvokeAsync(HttpContext context) - { - // 检查是否为WebSocket请求 - if (context.WebSockets.IsWebSocketRequest) - { - // 创建服务作用域,确保服务实例的生命周期正确 - using var scope = _serviceProvider.CreateScope(); - var webSocketService = scope.ServiceProvider.GetRequiredService(); - - // 接受WebSocket连接 - var webSocket = await context.WebSockets.AcceptWebSocketAsync(); - var connectionId = await webSocketService.AcceptConnectionAsync(webSocket); - - try - { - // 处理WebSocket连接 - await HandleWebSocketConnection(webSocket, connectionId, webSocketService); - } - catch (Exception ex) - { - _logger.LogError(ex, "处理WebSocket连接时出错: {ConnectionId}", connectionId); - } - finally - { - // 确保连接被正确关闭 - await webSocketService.CloseConnectionAsync(connectionId); - } - } - else - { - // 如果不是WebSocket请求,继续处理管道 - await _next(context); - } - } - - /// - /// 处理WebSocket连接 - /// 循环接收消息并处理,直到连接关闭 - /// - /// WebSocket实例,用于接收和发送消息 - /// 连接ID,用于标识连接 - /// WebSocket服务,用于处理消息 - private async Task HandleWebSocketConnection( - System.Net.WebSockets.WebSocket webSocket, - string connectionId, - IWebSocketService webSocketService) - { - // 创建接收缓冲区,大小为4KB - var buffer = new byte[1024 * 4]; - var receiveResult = await webSocket.ReceiveAsync( - new ArraySegment(buffer), CancellationToken.None); - - // 循环接收消息,直到收到关闭消息 - while (!receiveResult.CloseStatus.HasValue) - { - try - { - // 处理接收到的消息 - var message = buffer.Take(receiveResult.Count).ToArray(); - await webSocketService.SendMessageAsync(connectionId, message); - - // 继续接收下一条消息 - receiveResult = await webSocket.ReceiveAsync( - new ArraySegment(buffer), CancellationToken.None); - } - catch (Exception ex) - { - _logger.LogError(ex, "处理WebSocket消息时出错: {ConnectionId}", connectionId); - break; - } - } - - // 如果收到关闭消息,则关闭连接 - if (receiveResult.CloseStatus.HasValue) - { - await webSocket.CloseAsync( - receiveResult.CloseStatus.Value, - receiveResult.CloseStatusDescription, - CancellationToken.None); - } - } -} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs deleted file mode 100644 index a39d909..0000000 --- a/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs +++ /dev/null @@ -1,420 +0,0 @@ -using System.Net.WebSockets; -using System.Text; -using System.Text.Json; -using CellularManagement.Application.Services; -using CellularManagement.Domain.Entities; -using CellularManagement.Infrastructure.Monitoring; -using CellularManagement.Infrastructure.Pooling; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Caching.Memory; - -namespace CellularManagement.Infrastructure.WebSocket; - -/// -/// WebSocket服务实现类 -/// 负责管理WebSocket连接的生命周期、消息发送和接收 -/// 使用分布式缓存存储连接信息,本地缓存存储WebSocket实例 -/// -public class WebSocketService : IWebSocketService -{ - /// - /// 日志记录器,用于记录WebSocket服务的操作日志 - /// - private readonly ILogger _logger; - - /// - /// WebSocket指标监控,用于记录连接、消息等指标 - /// - private readonly WebSocketMetrics _metrics; - - /// - /// WebSocket消息池,用于复用消息上下文对象 - /// - private readonly WebSocketMessagePool _messagePool; - - /// - /// 分布式缓存,用于存储连接信息 - /// - private readonly IDistributedCache _distributedCache; - - /// - /// 本地缓存服务,用于存储WebSocket实例 - /// - private readonly ICacheService _cacheService; - - /// - /// 当前节点ID,用于标识服务实例 - /// - private readonly string _nodeId; - - /// - /// WebSocket连接信息在缓存中的键前缀 - /// - private const string CONNECTION_PREFIX = "ws_connection_"; - - /// - /// WebSocket实例在缓存中的键前缀 - /// - private const string WEBSOCKET_PREFIX = "ws_socket_"; - - /// - /// 用户关联信息在缓存中的键前缀 - /// - private const string USER_PREFIX = "ws_user_"; - - /// - /// 节点信息在缓存中的键前缀 - /// - private const string NODE_PREFIX = "ws_node_"; - - /// - /// 构造函数 - /// - /// 日志记录器,用于记录服务操作日志 - /// WebSocket指标监控,用于记录连接和消息指标 - /// 消息池,用于复用消息上下文对象 - /// 分布式缓存,用于存储连接信息 - /// 本地缓存服务,用于存储WebSocket实例 - public WebSocketService( - ILogger logger, - WebSocketMetrics metrics, - WebSocketMessagePool messagePool, - IDistributedCache distributedCache, - ICacheService cacheService) - { - _logger = logger; - _metrics = metrics; - _messagePool = messagePool; - _distributedCache = distributedCache; - _cacheService = cacheService; - _nodeId = Guid.NewGuid().ToString(); // 生成唯一的节点ID - } - - /// - /// 接受新的WebSocket连接 - /// 创建连接信息并存储到缓存中 - /// - /// WebSocket实例 - /// 新创建的连接ID - public async Task AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket) - { - var connectionId = Guid.NewGuid().ToString(); - var connection = WebSocketConnection.Create(connectionId); - - // 生成缓存键 - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - - // 将连接信息存储到分布式缓存 - await _distributedCache.SetStringAsync( - connectionKey, - JsonSerializer.Serialize(connection), - new DistributedCacheEntryOptions - { - SlidingExpiration = TimeSpan.FromMinutes(30) // 设置30分钟滑动过期 - }); - - // 将WebSocket实例存储到本地缓存 - _cacheService.Set(webSocketKey, webSocket, new MemoryCacheEntryOptions - { - SlidingExpiration = TimeSpan.FromMinutes(30) - }); - - // 将连接添加到当前节点 - await AddConnectionToNodeAsync(connectionId); - _metrics.ConnectionEstablished(); // 记录连接建立指标 - _logger.LogInformation("WebSocket连接已建立: {ConnectionId} 在节点 {NodeId}", - connectionId, _nodeId); - - return connectionId; - } - - /// - /// 关闭WebSocket连接 - /// 清理连接相关的所有缓存信息 - /// - /// 要关闭的连接ID - /// 是否成功关闭连接 - public async Task CloseConnectionAsync(string connectionId) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; - var userKey = $"{USER_PREFIX}{connectionId}"; - - // 获取连接信息 - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket)) - { - try - { - if (connectionJson != null) - { - var connection = JsonSerializer.Deserialize(connectionJson); - if (connection != null && webSocket != null) - { - try - { - // 如果WebSocket处于打开状态,则正常关闭 - if (webSocket.State == WebSocketState.Open || webSocket.State == WebSocketState.CloseReceived) - { - await webSocket.CloseAsync( - WebSocketCloseStatus.NormalClosure, - "服务器关闭连接", - CancellationToken.None); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "关闭WebSocket连接时出错: {ConnectionId}", connectionId); - } - - connection.Close(); - await _distributedCache.SetStringAsync(connectionKey, JsonSerializer.Serialize(connection)); - } - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 继续清理", connectionId); - } - } - - // 清理连接相关的所有缓存 - await RemoveConnectionFromNodeAsync(connectionId); - await _distributedCache.RemoveAsync(connectionKey); - _cacheService.Remove(webSocketKey); - await _distributedCache.RemoveAsync(userKey); - - _metrics.ConnectionClosed(); // 记录连接关闭指标 - _logger.LogInformation("WebSocket连接已关闭: {ConnectionId}", connectionId); - return true; - } - - /// - /// 向指定连接发送消息 - /// - /// 目标连接ID - /// 要发送的消息内容 - /// 是否成功发送消息 - public async Task SendMessageAsync(string connectionId, byte[] message) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; - - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket)) - { - try - { - if (connectionJson != null) - { - var connection = JsonSerializer.Deserialize(connectionJson); - if (connection?.State == WebSocketState.Open && webSocket != null) - { - try - { - await webSocket.SendAsync( - new ArraySegment(message), - WebSocketMessageType.Text, - true, - CancellationToken.None); - - _metrics.MessageProcessed(TimeSpan.Zero); // 记录消息处理指标 - return true; - } - catch (Exception ex) - { - _logger.LogError(ex, "发送消息到连接时出错: {ConnectionId}", connectionId); - _metrics.ErrorOccurred("SendMessage"); // 记录错误指标 - } - } - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 跳过消息发送", connectionId); - } - } - return false; - } - - /// - /// 广播消息到所有连接 - /// 遍历所有节点上的所有连接并发送消息 - /// - /// 要广播的消息内容 - /// 是否所有消息都发送成功 - public async Task BroadcastMessageAsync(byte[] message) - { - var nodes = await GetAllNodesAsync(); - var success = true; - - // 遍历所有节点上的所有连接 - foreach (var node in nodes) - { - var nodeKey = $"{NODE_PREFIX}{node}"; - var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); - if (connectionsJson != null) - { - var connections = JsonSerializer.Deserialize>(connectionsJson); - foreach (var connectionId in connections) - { - if (!await SendMessageAsync(connectionId, message)) - { - success = false; - } - } - } - } - - return success; - } - - /// - /// 向指定用户的所有连接发送消息 - /// - /// 目标用户ID - /// 要发送的消息内容 - /// 是否所有消息都发送成功 - public async Task SendMessageToUserAsync(string userId, byte[] message) - { - var userConnections = await GetUserConnectionsAsync(userId); - var success = true; - - // 向用户的所有连接发送消息 - foreach (var connection in userConnections) - { - if (!await SendMessageAsync(connection.ConnectionId, message)) - { - success = false; - } - } - - return success; - } - - /// - /// 将连接与用户关联 - /// 更新连接信息和用户关联缓存 - /// - /// 连接ID - /// 用户ID - public async Task AssociateUserAsync(string connectionId, string userId) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var userKey = $"{USER_PREFIX}{connectionId}"; - - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - if (connectionJson != null) - { - var connection = JsonSerializer.Deserialize(connectionJson); - if (connection != null) - { - connection.AssociateUser(userId); - await _distributedCache.SetStringAsync(connectionKey, JsonSerializer.Serialize(connection)); - await _distributedCache.SetStringAsync(userKey, userId); - } - } - } - - /// - /// 获取指定连接的信息 - /// - /// 连接ID - /// 连接信息,如果不存在则返回null - public async Task GetConnectionAsync(string connectionId) - { - var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; - var connectionJson = await _distributedCache.GetStringAsync(connectionKey); - return connectionJson != null - ? JsonSerializer.Deserialize(connectionJson) - : null; - } - - /// - /// 获取指定用户的所有连接 - /// - /// 用户ID - /// 用户的所有连接信息 - public async Task> GetUserConnectionsAsync(string userId) - { - var connections = new List(); - var nodes = await GetAllNodesAsync(); - - // 遍历所有节点,查找用户的连接 - foreach (var node in nodes) - { - var nodeKey = $"{NODE_PREFIX}{node}"; - var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); - if (connectionsJson != null) - { - var connectionIds = JsonSerializer.Deserialize>(connectionsJson); - foreach (var connectionId in connectionIds) - { - var connection = await GetConnectionAsync(connectionId); - if (connection?.UserId == userId) - { - connections.Add(connection); - } - } - } - } - - return connections; - } - - /// - /// 将连接添加到当前节点 - /// - /// 要添加的连接ID - private async Task AddConnectionToNodeAsync(string connectionId) - { - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - var connections = await GetNodeConnectionsAsync(); - connections.Add(connectionId); - await _distributedCache.SetStringAsync( - nodeKey, - JsonSerializer.Serialize(connections)); - } - - /// - /// 从当前节点移除连接 - /// - /// 要移除的连接ID - private async Task RemoveConnectionFromNodeAsync(string connectionId) - { - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - var connections = await GetNodeConnectionsAsync(); - connections.Remove(connectionId); - await _distributedCache.SetStringAsync( - nodeKey, - JsonSerializer.Serialize(connections)); - } - - /// - /// 获取当前节点的所有连接 - /// - /// 当前节点的所有连接ID列表 - private async Task> GetNodeConnectionsAsync() - { - var nodeKey = $"{NODE_PREFIX}{_nodeId}"; - var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); - return connectionsJson != null - ? JsonSerializer.Deserialize>(connectionsJson) ?? new List() - : new List(); - } - - /// - /// 获取所有节点ID - /// 目前仅返回当前节点ID,后续需要实现服务发现机制 - /// - /// 所有节点ID列表 - private async Task> GetAllNodesAsync() - { - // 这里需要实现服务发现机制 - // 可以使用Redis的Pub/Sub或其他服务发现机制 - return new List { _nodeId }; - } -} \ No newline at end of file diff --git a/src/CellularManagement.WebAPI/CellularManagement.WebAPI.http b/src/CellularManagement.WebAPI/CellularManagement.WebAPI.http index b0b832b..658fee9 100644 --- a/src/CellularManagement.WebAPI/CellularManagement.WebAPI.http +++ b/src/CellularManagement.WebAPI/CellularManagement.WebAPI.http @@ -1,5 +1,4 @@ @CellularManagement.WebAPI_HostAddress = http://localhost:5202 -@WebSocket_HostAddress = ws://localhost:5202 ### 获取天气预测 GET {{CellularManagement.WebAPI_HostAddress}}/weatherforecast/ @@ -19,18 +18,6 @@ Content-Type: application/json GET {{CellularManagement.WebAPI_HostAddress}}/api/users/me Authorization: Bearer {{login.response.body.token}} -### WebSocket连接测试 -# 使用WebSocket客户端工具连接 -# 连接地址: {{WebSocket_HostAddress}}/ws - -### 发送WebSocket消息 -# 使用WebSocket客户端工具发送消息 -# 消息格式: {"type": "message", "content": "Hello, WebSocket!"} - ### 获取系统状态 GET {{CellularManagement.WebAPI_HostAddress}}/api/system/status Authorization: Bearer {{login.response.body.token}} - -### 获取WebSocket连接统计 -GET {{CellularManagement.WebAPI_HostAddress}}/api/websocket/stats -Authorization: Bearer {{login.response.body.token}} diff --git a/src/CellularManagement.WebAPI/Program.cs b/src/CellularManagement.WebAPI/Program.cs index 6575fce..417399f 100644 --- a/src/CellularManagement.WebAPI/Program.cs +++ b/src/CellularManagement.WebAPI/Program.cs @@ -6,10 +6,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using CellularManagement.Infrastructure.Configurations; using CellularManagement.Application; -using CellularManagement.Infrastructure.WebSocket; -using CellularManagement.Infrastructure.Monitoring; -using CellularManagement.Infrastructure.Pooling; -using CellularManagement.Application.Services; using Microsoft.Extensions.Options; // 创建 Web 应用程序构建器 @@ -40,7 +36,10 @@ builder.Services.AddSwaggerGen(); // 注册 MediatR 服务 // 从 WebAPI 程序集加载处理器,用于处理 Web 层特定的命令和查询 -builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly)); +builder.Services.AddMediatR(cfg => +{ + cfg.RegisterServicesFromAssembly(typeof(Program).Assembly); +}); // 添加 CORS 服务 // 配置跨域资源共享支持 @@ -49,14 +48,6 @@ builder.Services.AddCors(); // 配置认证服务 // 从配置文件中读取认证相关配置 builder.Services.Configure(builder.Configuration.GetSection("Auth")); -builder.Services.Configure(builder.Configuration.GetSection(WebSocketConfiguration.SectionName)); - -// 注册WebSocket相关服务 -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddScoped(); // 添加静态文件服务 builder.Services.AddDirectoryBrowser(); @@ -86,15 +77,6 @@ app.UseCors(x => x.AllowAnyHeader().SetIsOriginAllowed(p => true).AllowAnyMethod // 处理用户认证和授权 app.UseAuthorization(); -// 配置 WebSocket 中间件 -var webSocketConfig = app.Services.GetRequiredService>().Value; -app.UseWebSockets(new Microsoft.AspNetCore.Builder.WebSocketOptions -{ - KeepAliveInterval = TimeSpan.FromMinutes(2) -}); - -app.UseMiddleware(); - // 配置静态文件中间件 app.UseStaticFiles(); app.UseDirectoryBrowser(); diff --git a/src/CellularManagement.WebAPI/appsettings.README.md b/src/CellularManagement.WebAPI/appsettings.README.md index 7d3c40e..af1439f 100644 --- a/src/CellularManagement.WebAPI/appsettings.README.md +++ b/src/CellularManagement.WebAPI/appsettings.README.md @@ -25,10 +25,6 @@ - `KeyRotationDays`: 密钥轮换周期(天) - `MinKeyLength`: 最小密钥长度 -## WebSocket 配置 -- `Port`: WebSocket 服务端口 -- `Path`: WebSocket 连接路径 - ## 认证配置 (Auth) - `MaxLoginAttempts`: 最大登录尝试次数 - `LoginAttemptsWindowMinutes`: 登录尝试窗口时间(分钟) @@ -46,7 +42,6 @@ - `ASPNETCORE_ENVIRONMENT` - 设置运行环境(Development/Production) - `DatabaseOptions__DefaultConnection` - 数据库连接字符串 - `JwtOptions__SecretKey` - JWT密钥 -- `WebSocket__Port` - WebSocket端口 - `Auth__MaxLoginAttempts` - 最大登录尝试次数 ## 配置优先级 diff --git a/src/CellularManagement.WebAPI/appsettings.json b/src/CellularManagement.WebAPI/appsettings.json index 62af76b..f0cb51f 100644 --- a/src/CellularManagement.WebAPI/appsettings.json +++ b/src/CellularManagement.WebAPI/appsettings.json @@ -34,9 +34,5 @@ "DefaultUserRole": "User", "AccessTokenExpirationMinutes": 60, "RefreshTokenExpirationDays": 7 - }, - "WebSocket": { - "Port": 5202, - "Path": "/ws" } } diff --git a/src/CellularManagement.WebAPI/wwwroot/websocket.html b/src/CellularManagement.WebAPI/wwwroot/websocket.html index 95c40b4..ec2b44b 100644 --- a/src/CellularManagement.WebAPI/wwwroot/websocket.html +++ b/src/CellularManagement.WebAPI/wwwroot/websocket.html @@ -56,7 +56,6 @@