From 57292385ad6faf853a4f15b065fa921209f72c93 Mon Sep 17 00:00:00 2001 From: hyh Date: Tue, 29 Apr 2025 18:00:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=EF=BC=9A=E6=B7=BB=E5=8A=A0We?= =?UTF-8?q?bSocket=E7=9B=B8=E5=85=B3=E5=8A=9F=E8=83=BD=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Pipeline/IWebSocketMessageHandler.cs | 28 ++ .../Services/IWebSocketService.cs | 16 + .../Services/WebSocketService.cs | 137 +++++++ .../CellularManagement.Domain.csproj | 1 + .../Entities/WebSocketConnection.cs | 66 +++ .../Events/WebSocketMessageEvent.cs | 35 ++ .../Configurations/WebSocketConfiguration.cs | 9 + .../DependencyInjection.cs | 12 + .../DistributedWebSocketManager.cs | 196 +++++++++ .../Monitoring/WebSocketMetrics.cs | 65 +++ .../WebSocketMessageValidationHandler.cs | 54 +++ .../Pipeline/WebSocketMessagePipeline.cs | 145 +++++++ .../Pooling/WebSocketMessagePool.cs | 59 +++ .../WebSocket/WebSocketConnectionManager.cs | 67 +++ .../WebSocket/WebSocketMessagePipeline.cs | 116 ++++++ .../WebSocket/WebSocketMessagePool.cs | 41 ++ .../WebSocket/WebSocketMiddleware.cs | 89 ++++ .../WebSocket/WebSocketService.cs | 295 ++++++++++++++ .../CellularManagement.WebAPI.csproj | 8 +- src/CellularManagement.WebAPI/Program.cs | 29 ++ .../appsettings.json | 4 + .../wwwroot/websocket.html | 382 ++++++++++++++++++ 22 files changed, 1853 insertions(+), 1 deletion(-) create mode 100644 src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs create mode 100644 src/CellularManagement.Application/Services/IWebSocketService.cs create mode 100644 src/CellularManagement.Application/Services/WebSocketService.cs create mode 100644 src/CellularManagement.Domain/Entities/WebSocketConnection.cs create mode 100644 src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs create mode 100644 src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs create mode 100644 src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs create mode 100644 src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs create mode 100644 src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs create mode 100644 src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs create mode 100644 src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs create mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs create mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs create mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs create mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs create mode 100644 src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs create mode 100644 src/CellularManagement.WebAPI/wwwroot/websocket.html diff --git a/src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs b/src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs new file mode 100644 index 0000000..a369832 --- /dev/null +++ b/src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs @@ -0,0 +1,28 @@ +using System.Threading.Channels; + +namespace CellularManagement.Application.Pipeline; + +public interface IWebSocketMessageHandler +{ + string HandlerName { get; } + int Priority { get; } + bool CanHandle(string messageType); + Task HandleAsync(WebSocketMessageContext context, CancellationToken cancellationToken); +} + +public class WebSocketMessageContext +{ + public string ConnectionId { get; set; } = string.Empty; + public string MessageType { get; set; } = string.Empty; + public byte[] Payload { get; set; } = Array.Empty(); + public Dictionary Properties { get; } = new(); + public bool IsHandled { get; set; } + public Exception? Error { get; set; } +} + +public interface IWebSocketMessagePipeline +{ + Task ProcessMessageAsync(WebSocketMessageContext context, CancellationToken cancellationToken); + void RegisterHandler(IWebSocketMessageHandler handler); + void UnregisterHandler(string handlerName); +} \ No newline at end of file diff --git a/src/CellularManagement.Application/Services/IWebSocketService.cs b/src/CellularManagement.Application/Services/IWebSocketService.cs new file mode 100644 index 0000000..7c2313e --- /dev/null +++ b/src/CellularManagement.Application/Services/IWebSocketService.cs @@ -0,0 +1,16 @@ +using System.Net.WebSockets; +using CellularManagement.Domain.Entities; + +namespace CellularManagement.Application.Services; + +public interface IWebSocketService +{ + Task AcceptConnectionAsync(WebSocket webSocket); + Task CloseConnectionAsync(string connectionId); + Task SendMessageAsync(string connectionId, byte[] message); + Task BroadcastMessageAsync(byte[] message); + Task SendMessageToUserAsync(string userId, byte[] message); + Task AssociateUserAsync(string connectionId, string userId); + Task GetConnectionAsync(string connectionId); + Task> GetUserConnectionsAsync(string userId); +} \ No newline at end of file diff --git a/src/CellularManagement.Application/Services/WebSocketService.cs b/src/CellularManagement.Application/Services/WebSocketService.cs new file mode 100644 index 0000000..bb6cb64 --- /dev/null +++ b/src/CellularManagement.Application/Services/WebSocketService.cs @@ -0,0 +1,137 @@ +using System.Net.WebSockets; +using CellularManagement.Domain.Entities; +using CellularManagement.Application.Services; +using CellularManagement.Application.Pipeline; +using Microsoft.Extensions.Logging; +using MediatR; +using CellularManagement.Domain.Events; + +namespace CellularManagement.Application.Services; + +public class WebSocketService : IWebSocketService +{ + private readonly ICacheService _cacheService; + private readonly ILogger _logger; + private readonly IWebSocketMessagePipeline _messagePipeline; + private readonly IMediator _mediator; + private const string CONNECTION_PREFIX = "ws_connection_"; + private const string USER_CONNECTION_PREFIX = "ws_user_"; + + public WebSocketService( + ICacheService cacheService, + ILogger logger, + IWebSocketMessagePipeline messagePipeline, + IMediator mediator) + { + _cacheService = cacheService; + _logger = logger; + _messagePipeline = messagePipeline; + _mediator = mediator; + } + + public async Task AcceptConnectionAsync(WebSocket webSocket) + { + var connectionId = Guid.NewGuid().ToString(); + var connection = WebSocketConnection.Create(connectionId); + + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + _cacheService.Set(connectionKey, webSocket); + _cacheService.Set(connectionId, connection); + + _logger.LogInformation("WebSocket connection accepted: {ConnectionId}", connectionId); + return connectionId; + } + + public async Task CloseConnectionAsync(string connectionId) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var userKey = $"{USER_CONNECTION_PREFIX}{connectionId}"; + + var connection = _cacheService.Get(connectionId); + if (connection != null) + { + connection.Close(); + _cacheService.Set(connectionId, connection); + } + + _cacheService.Remove(connectionKey); + _cacheService.Remove(userKey); + + _logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId); + return true; + } + + public async Task SendMessageAsync(string connectionId, byte[] message) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var webSocket = _cacheService.Get(connectionKey); + + if (webSocket?.State == WebSocketState.Open) + { + await webSocket.SendAsync( + new ArraySegment(message), + WebSocketMessageType.Text, + true, + CancellationToken.None); + + await _mediator.Publish(new WebSocketMessageSentEvent( + connectionId, + "text", + message)); + + return true; + } + return false; + } + + public async Task BroadcastMessageAsync(byte[] message) + { + // 注意:这里需要实现广播逻辑 + // 由于ICacheService的限制,可能需要使用其他方式实现 + return false; + } + + public async Task SendMessageToUserAsync(string userId, byte[] message) + { + // 注意:这里需要实现向特定用户发送消息的逻辑 + // 由于ICacheService的限制,可能需要使用其他方式实现 + return false; + } + + public async Task AssociateUserAsync(string connectionId, string userId) + { + var connection = _cacheService.Get(connectionId); + if (connection != null) + { + connection.AssociateUser(userId); + _cacheService.Set(connectionId, connection); + + var userKey = $"{USER_CONNECTION_PREFIX}{connectionId}"; + _cacheService.Set(userKey, userId); + } + } + + public async Task GetConnectionAsync(string connectionId) + { + return _cacheService.Get(connectionId); + } + + public async Task> GetUserConnectionsAsync(string userId) + { + // 注意:这里需要实现获取用户所有连接的逻辑 + // 由于ICacheService的限制,可能需要使用其他方式实现 + return Enumerable.Empty(); + } + + public async Task ProcessMessageAsync(string connectionId, string messageType, byte[] payload) + { + var context = new WebSocketMessageContext + { + ConnectionId = connectionId, + MessageType = messageType, + Payload = payload + }; + + await _messagePipeline.ProcessMessageAsync(context, CancellationToken.None); + } +} \ No newline at end of file diff --git a/src/CellularManagement.Domain/CellularManagement.Domain.csproj b/src/CellularManagement.Domain/CellularManagement.Domain.csproj index 8107beb..8321811 100644 --- a/src/CellularManagement.Domain/CellularManagement.Domain.csproj +++ b/src/CellularManagement.Domain/CellularManagement.Domain.csproj @@ -8,6 +8,7 @@ + diff --git a/src/CellularManagement.Domain/Entities/WebSocketConnection.cs b/src/CellularManagement.Domain/Entities/WebSocketConnection.cs new file mode 100644 index 0000000..8cbc36d --- /dev/null +++ b/src/CellularManagement.Domain/Entities/WebSocketConnection.cs @@ -0,0 +1,66 @@ +using System.Net.WebSockets; +using System.Text.Json.Serialization; + +namespace CellularManagement.Domain.Entities; + +public class WebSocketConnection +{ + [JsonPropertyName("connectionId")] + public string ConnectionId { get; set; } + + [JsonPropertyName("userId")] + public string? UserId { get; set; } + + [JsonPropertyName("connectedAt")] + public DateTime ConnectedAt { get; set; } + + [JsonPropertyName("lastActivityAt")] + public DateTime? LastActivityAt { get; set; } + + [JsonPropertyName("state")] + public WebSocketState State { get; set; } + + // 添加无参构造函数用于反序列化 + public WebSocketConnection() + { + ConnectionId = string.Empty; + ConnectedAt = DateTime.UtcNow; + State = WebSocketState.Open; + } + + // 添加带参数的构造函数 + [JsonConstructor] + public WebSocketConnection(string connectionId, string? userId, DateTime connectedAt, DateTime? lastActivityAt, WebSocketState state) + { + ConnectionId = connectionId; + UserId = userId; + ConnectedAt = connectedAt; + LastActivityAt = lastActivityAt; + State = state; + } + + public static WebSocketConnection Create(string connectionId) + { + return new WebSocketConnection + { + ConnectionId = connectionId, + ConnectedAt = DateTime.UtcNow, + State = WebSocketState.Open + }; + } + + public void AssociateUser(string userId) + { + UserId = userId; + } + + public void UpdateLastActivity() + { + LastActivityAt = DateTime.UtcNow; + } + + public void Close() + { + State = WebSocketState.Closed; + } +} \ No newline at end of file diff --git a/src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs b/src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs new file mode 100644 index 0000000..6ac1f74 --- /dev/null +++ b/src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs @@ -0,0 +1,35 @@ +using MediatR; + +namespace CellularManagement.Domain.Events; + +public class WebSocketMessageReceivedEvent : INotification +{ + public string ConnectionId { get; } + public string MessageType { get; } + public byte[] Payload { get; } + public DateTime Timestamp { get; } + + public WebSocketMessageReceivedEvent(string connectionId, string messageType, byte[] payload) + { + ConnectionId = connectionId; + MessageType = messageType; + Payload = payload; + Timestamp = DateTime.UtcNow; + } +} + +public class WebSocketMessageSentEvent : INotification +{ + public string ConnectionId { get; } + public string MessageType { get; } + public byte[] Payload { get; } + public DateTime Timestamp { get; } + + public WebSocketMessageSentEvent(string connectionId, string messageType, byte[] payload) + { + ConnectionId = connectionId; + MessageType = messageType; + Payload = payload; + Timestamp = DateTime.UtcNow; + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs b/src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs new file mode 100644 index 0000000..bbe6fb4 --- /dev/null +++ b/src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs @@ -0,0 +1,9 @@ +namespace CellularManagement.Infrastructure.Configurations; + +public class WebSocketConfiguration +{ + public const string SectionName = "WebSocket"; + + public int Port { get; set; } = 5202; + public string Path { get; set; } = "/ws"; +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/DependencyInjection.cs b/src/CellularManagement.Infrastructure/DependencyInjection.cs index cc5795b..ebfc540 100644 --- a/src/CellularManagement.Infrastructure/DependencyInjection.cs +++ b/src/CellularManagement.Infrastructure/DependencyInjection.cs @@ -8,6 +8,8 @@ using CellularManagement.Domain.Entities; using CellularManagement.Domain.Repositories; using CellularManagement.Application.Services; using CellularManagement.Infrastructure.Services; +using CellularManagement.Infrastructure.WebSocket; +using CellularManagement.Infrastructure.Monitoring; using Scrutor; namespace CellularManagement.Infrastructure; @@ -110,10 +112,20 @@ public static class DependencyInjection // 添加内存缓存 services.AddMemoryCache(); + + // 添加分布式缓存 + services.AddDistributedMemoryCache(); // 注册缓存服务 services.AddScoped(); + // 注册WebSocket相关服务 + services.AddSingleton(); + services.AddSingleton(); + services.AddScoped(); + services.AddSingleton(); + services.AddSingleton(); + // 自动注册服务 services .Scan(action => diff --git a/src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs b/src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs new file mode 100644 index 0000000..eac51af --- /dev/null +++ b/src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs @@ -0,0 +1,196 @@ +using System.Net.WebSockets; +using CellularManagement.Domain.Entities; +using CellularManagement.Application.Services; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Caching.Distributed; +using System.Text.Json; + +namespace CellularManagement.Infrastructure.Distributed; + +public class DistributedWebSocketManager : IWebSocketService +{ + private readonly IDistributedCache _distributedCache; + private readonly ILogger _logger; + private readonly string _nodeId; + private const string CONNECTION_PREFIX = "ws_connection_"; + private const string USER_PREFIX = "ws_user_"; + private const string NODE_PREFIX = "ws_node_"; + + public DistributedWebSocketManager( + IDistributedCache distributedCache, + ILogger logger) + { + _distributedCache = distributedCache; + _logger = logger; + _nodeId = Guid.NewGuid().ToString(); + } + + public async Task AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket) + { + var connectionId = Guid.NewGuid().ToString(); + var connection = WebSocketConnection.Create(connectionId); + + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + + // 存储连接信息 + await _distributedCache.SetStringAsync( + connectionKey, + JsonSerializer.Serialize(connection), + new DistributedCacheEntryOptions + { + SlidingExpiration = TimeSpan.FromMinutes(30) + }); + + // 更新节点连接列表 + await AddConnectionToNodeAsync(connectionId); + + _logger.LogInformation("WebSocket connection accepted: {ConnectionId} on node {NodeId}", + connectionId, _nodeId); + + return connectionId; + } + + public async Task CloseConnectionAsync(string connectionId) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var userKey = $"{USER_PREFIX}{connectionId}"; + + // 获取连接信息 + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + if (connectionJson != null) + { + var connection = JsonSerializer.Deserialize(connectionJson); + if (connection != null) + { + connection.Close(); + await _distributedCache.SetStringAsync( + connectionKey, + JsonSerializer.Serialize(connection)); + } + } + + // 从节点连接列表中移除 + await RemoveConnectionFromNodeAsync(connectionId); + + // 清理缓存 + await _distributedCache.RemoveAsync(connectionKey); + await _distributedCache.RemoveAsync(userKey); + + _logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId); + return true; + } + + public async Task SendMessageAsync(string connectionId, byte[] message) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + + if (connectionJson != null) + { + var connection = JsonSerializer.Deserialize(connectionJson); + if (connection?.State == WebSocketState.Open) + { + // 这里需要实现消息发送逻辑 + // 可能需要使用消息队列或其他机制 + return true; + } + } + return false; + } + + public async Task BroadcastMessageAsync(byte[] message) + { + // 获取所有节点的连接列表 + var nodes = await GetAllNodesAsync(); + foreach (var node in nodes) + { + // 向每个节点发送广播消息 + // 这里需要实现节点间的消息传递机制 + } + return true; + } + + public async Task SendMessageToUserAsync(string userId, byte[] message) + { + // 获取用户的所有连接 + var connections = await GetUserConnectionsAsync(userId); + foreach (var connection in connections) + { + await SendMessageAsync(connection.ConnectionId, message); + } + return true; + } + + public async Task AssociateUserAsync(string connectionId, string userId) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var userKey = $"{USER_PREFIX}{connectionId}"; + + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + if (connectionJson != null) + { + var connection = JsonSerializer.Deserialize(connectionJson); + if (connection != null) + { + connection.AssociateUser(userId); + await _distributedCache.SetStringAsync( + connectionKey, + JsonSerializer.Serialize(connection)); + await _distributedCache.SetStringAsync(userKey, userId); + } + } + } + + public async Task GetConnectionAsync(string connectionId) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + return connectionJson != null + ? JsonSerializer.Deserialize(connectionJson) + : null; + } + + public async Task> GetUserConnectionsAsync(string userId) + { + var connections = new List(); + // 这里需要实现获取用户所有连接的逻辑 + return connections; + } + + private async Task AddConnectionToNodeAsync(string connectionId) + { + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + var connections = await GetNodeConnectionsAsync(); + connections.Add(connectionId); + await _distributedCache.SetStringAsync( + nodeKey, + JsonSerializer.Serialize(connections)); + } + + private async Task RemoveConnectionFromNodeAsync(string connectionId) + { + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + var connections = await GetNodeConnectionsAsync(); + connections.Remove(connectionId); + await _distributedCache.SetStringAsync( + nodeKey, + JsonSerializer.Serialize(connections)); + } + + private async Task> GetNodeConnectionsAsync() + { + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); + return connectionsJson != null + ? JsonSerializer.Deserialize>(connectionsJson) ?? new List() + : new List(); + } + + private async Task> GetAllNodesAsync() + { + // 这里需要实现获取所有节点的逻辑 + // 可能需要使用服务发现或其他机制 + return new List(); + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs b/src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs new file mode 100644 index 0000000..79b9dca --- /dev/null +++ b/src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs @@ -0,0 +1,65 @@ +using System.Diagnostics; +using System.Diagnostics.Metrics; +using Microsoft.Extensions.Logging; + +namespace CellularManagement.Infrastructure.Monitoring; + +public class WebSocketMetrics +{ + private readonly ILogger _logger; + private readonly Meter _meter; + private readonly Counter _connectionsCounter; + private readonly Counter _messagesCounter; + private readonly Histogram _messageLatency; + private readonly Histogram _messageSize; + private readonly Counter _errorsCounter; + + public WebSocketMetrics(ILogger logger) + { + _logger = logger; + _meter = new Meter("CellularManagement.WebSocket", "1.0.0"); + + _connectionsCounter = _meter.CreateCounter("websocket.connections", "个", "活跃连接数"); + _messagesCounter = _meter.CreateCounter("websocket.messages", "个", "消息处理数"); + _messageLatency = _meter.CreateHistogram("websocket.message.latency", "ms", "消息处理延迟"); + _messageSize = _meter.CreateHistogram("websocket.message.size", "bytes", "消息大小"); + _errorsCounter = _meter.CreateCounter("websocket.errors", "个", "错误数"); + } + + public void ConnectionEstablished() + { + _connectionsCounter.Add(1); + _logger.LogInformation("WebSocket connection established"); + } + + public void ConnectionClosed() + { + _connectionsCounter.Add(-1); + _logger.LogInformation("WebSocket connection closed"); + } + + public void MessageReceived(int size) + { + _messagesCounter.Add(1); + _messageSize.Record(size); + _logger.LogDebug("WebSocket message received, size: {Size} bytes", size); + } + + public void MessageProcessed(TimeSpan latency) + { + _messageLatency.Record(latency.TotalMilliseconds); + _logger.LogDebug("WebSocket message processed, latency: {Latency}ms", latency.TotalMilliseconds); + } + + public void ErrorOccurred(string errorType) + { + _errorsCounter.Add(1); + _logger.LogError("WebSocket error occurred: {ErrorType}", errorType); + } + + public void RecordGauge(string name, double value) + { + var gauge = _meter.CreateObservableGauge($"websocket.{name}", () => value); + _logger.LogDebug("WebSocket gauge recorded: {Name} = {Value}", name, value); + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs b/src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs new file mode 100644 index 0000000..8e7c76e --- /dev/null +++ b/src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs @@ -0,0 +1,54 @@ +using CellularManagement.Application.Pipeline; +using Microsoft.Extensions.Logging; + +namespace CellularManagement.Infrastructure.Pipeline.Handlers; + +public class WebSocketMessageValidationHandler : IWebSocketMessageHandler +{ + private readonly ILogger _logger; + + public string HandlerName => "MessageValidationHandler"; + public int Priority => 100; + + public WebSocketMessageValidationHandler(ILogger logger) + { + _logger = logger; + } + + public bool CanHandle(string messageType) + { + return true; // 处理所有消息类型 + } + + public async Task HandleAsync(WebSocketMessageContext context, CancellationToken cancellationToken) + { + try + { + // 验证消息格式 + if (string.IsNullOrEmpty(context.MessageType)) + { + throw new ArgumentException("Message type cannot be empty"); + } + + if (context.Payload == null || context.Payload.Length == 0) + { + throw new ArgumentException("Message payload cannot be empty"); + } + + // 验证消息大小 + if (context.Payload.Length > 1024 * 1024) // 1MB + { + throw new ArgumentException("Message payload too large"); + } + + _logger.LogInformation("Message validation passed for connection: {ConnectionId}", context.ConnectionId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Message validation failed for connection: {ConnectionId}", context.ConnectionId); + context.Error = ex; + context.IsHandled = true; + throw; + } + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs b/src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs new file mode 100644 index 0000000..e2bb614 --- /dev/null +++ b/src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs @@ -0,0 +1,145 @@ +using System.Threading.Channels; +using CellularManagement.Application.Pipeline; +using Microsoft.Extensions.Logging; +using MediatR; +using CellularManagement.Domain.Events; + +namespace CellularManagement.Infrastructure.Pipeline; + +public class WebSocketMessagePipeline : IWebSocketMessagePipeline +{ + private readonly Channel _messageChannel; + private readonly List _handlers = new(); + private readonly ILogger _logger; + private readonly IMediator _mediator; + private readonly int _batchSize; + private readonly int _maxQueueSize; + + public WebSocketMessagePipeline( + ILogger logger, + IMediator mediator, + int batchSize = 100, + int maxQueueSize = 10000) + { + _logger = logger; + _mediator = mediator; + _batchSize = batchSize; + _maxQueueSize = maxQueueSize; + + var options = new BoundedChannelOptions(maxQueueSize) + { + FullMode = BoundedChannelFullMode.Wait, + SingleWriter = false, + SingleReader = false + }; + + _messageChannel = Channel.CreateBounded(options); + } + + public async Task ProcessMessageAsync(WebSocketMessageContext context, CancellationToken cancellationToken) + { + try + { + await _messageChannel.Writer.WriteAsync(context, cancellationToken); + await _mediator.Publish(new WebSocketMessageReceivedEvent( + context.ConnectionId, + context.MessageType, + context.Payload), cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to process message for connection: {ConnectionId}", context.ConnectionId); + throw; + } + } + + public void RegisterHandler(IWebSocketMessageHandler handler) + { + _handlers.Add(handler); + _handlers.Sort((a, b) => b.Priority.CompareTo(a.Priority)); + } + + public void UnregisterHandler(string handlerName) + { + _handlers.RemoveAll(h => h.HandlerName == handlerName); + } + + public async Task StartProcessingAsync(CancellationToken cancellationToken) + { + var batch = new List(_batchSize); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + batch.Clear(); + + while (batch.Count < _batchSize && + await _messageChannel.Reader.WaitToReadAsync(cancellationToken)) + { + if (_messageChannel.Reader.TryRead(out var message)) + { + batch.Add(message); + } + } + + if (batch.Count > 0) + { + await ProcessBatchAsync(batch, cancellationToken); + } + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("Message processing pipeline stopped"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in message processing pipeline"); + } + } + + private async Task ProcessBatchAsync(List batch, CancellationToken cancellationToken) + { + foreach (var message in batch) + { + try + { + await ProcessMessageWithHandlersAsync(message, cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing message: {MessageId}", message.ConnectionId); + message.Error = ex; + } + } + } + + private async Task ProcessMessageWithHandlersAsync(WebSocketMessageContext context, CancellationToken cancellationToken) + { + foreach (var handler in _handlers) + { + if (handler.CanHandle(context.MessageType) && !context.IsHandled) + { + try + { + await handler.HandleAsync(context, cancellationToken); + if (context.IsHandled) + { + break; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Handler {HandlerName} failed to process message", handler.HandlerName); + throw; + } + } + } + + if (!context.IsHandled) + { + _logger.LogWarning("No handler found for message type: {MessageType}", context.MessageType); + } + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs b/src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs new file mode 100644 index 0000000..7c87ee3 --- /dev/null +++ b/src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs @@ -0,0 +1,59 @@ +using System.Buffers; +using System.Collections.Concurrent; +using CellularManagement.Application.Pipeline; + +namespace CellularManagement.Infrastructure.Pooling; + +public class WebSocketMessagePool +{ + private readonly ConcurrentBag _pool = new(); + private readonly ArrayPool _arrayPool = ArrayPool.Shared; + private readonly int _maxPoolSize; + private readonly int _bufferSize; + + public WebSocketMessagePool(int maxPoolSize = 1000, int bufferSize = 1024 * 4) + { + _maxPoolSize = maxPoolSize; + _bufferSize = bufferSize; + } + + public WebSocketMessageContext Rent() + { + if (_pool.TryTake(out var context)) + { + return context; + } + + return new WebSocketMessageContext + { + Payload = _arrayPool.Rent(_bufferSize) + }; + } + + public void Return(WebSocketMessageContext context) + { + if (_pool.Count < _maxPoolSize) + { + context.ConnectionId = string.Empty; + context.MessageType = string.Empty; + context.Properties.Clear(); + context.IsHandled = false; + context.Error = null; + + if (context.Payload != null) + { + _arrayPool.Return(context.Payload); + context.Payload = _arrayPool.Rent(_bufferSize); + } + + _pool.Add(context); + } + else + { + if (context.Payload != null) + { + _arrayPool.Return(context.Payload); + } + } + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs new file mode 100644 index 0000000..8b40e5f --- /dev/null +++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs @@ -0,0 +1,67 @@ +using System.Net.WebSockets; +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; +using CellularManagement.Application.Services; + +namespace CellularManagement.Infrastructure.WebSocket; + +public class WebSocketConnectionManager +{ + private readonly ILogger _logger; + private readonly ConcurrentDictionary _sockets = new(); + private readonly ConcurrentDictionary _userConnections = new(); + + public WebSocketConnectionManager(ILogger logger) + { + _logger = logger; + } + + public string AddConnection(System.Net.WebSockets.WebSocket socket) + { + var connectionId = Guid.NewGuid().ToString(); + _sockets.TryAdd(connectionId, socket); + _logger.LogInformation("WebSocket connection added: {ConnectionId}", connectionId); + return connectionId; + } + + public bool RemoveConnection(string connectionId) + { + _sockets.TryRemove(connectionId, out _); + _userConnections.TryRemove(connectionId, out _); + _logger.LogInformation("WebSocket connection removed: {ConnectionId}", connectionId); + return true; + } + + public System.Net.WebSockets.WebSocket? GetConnection(string connectionId) + { + _sockets.TryGetValue(connectionId, out var socket); + return socket; + } + + public IEnumerable GetAllConnections() + { + return _sockets.Values; + } + + public void AssociateUser(string connectionId, string userId) + { + _userConnections.TryAdd(connectionId, userId); + } + + public string? GetUserId(string connectionId) + { + _userConnections.TryGetValue(connectionId, out var userId); + return userId; + } + + public IEnumerable GetUserConnections(string userId) + { + var connections = _userConnections + .Where(kvp => kvp.Value == userId) + .Select(kvp => kvp.Key) + .Select(connectionId => GetConnection(connectionId)) + .Where(socket => socket != null); + + return connections!; + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs new file mode 100644 index 0000000..79c4716 --- /dev/null +++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs @@ -0,0 +1,116 @@ +using System.Threading.Channels; +using System.Buffers; +using Microsoft.Extensions.Logging; + +namespace CellularManagement.Infrastructure.WebSocket; + +public class WebSocketMessagePipeline +{ + private readonly Channel _messageChannel; + private readonly ILogger _logger; + private readonly MemoryPool _memoryPool; + private readonly int _batchSize; + private readonly int _maxQueueSize; + + public WebSocketMessagePipeline( + ILogger logger, + int batchSize = 100, + int maxQueueSize = 10000) + { + _logger = logger; + _batchSize = batchSize; + _maxQueueSize = maxQueueSize; + _memoryPool = MemoryPool.Shared; + + var options = new BoundedChannelOptions(maxQueueSize) + { + FullMode = BoundedChannelFullMode.Wait, + SingleWriter = false, + SingleReader = false + }; + + _messageChannel = Channel.CreateBounded(options); + } + + public async ValueTask WriteMessageAsync(WebSocketMessage message) + { + try + { + await _messageChannel.Writer.WriteAsync(message); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to write message to pipeline"); + return false; + } + } + + public async Task ProcessMessagesAsync(CancellationToken cancellationToken) + { + var batch = new List(_batchSize); + + try + { + while (!cancellationToken.IsCancellationRequested) + { + batch.Clear(); + + // 批量读取消息 + while (batch.Count < _batchSize && + await _messageChannel.Reader.WaitToReadAsync(cancellationToken)) + { + if (_messageChannel.Reader.TryRead(out var message)) + { + batch.Add(message); + } + } + + if (batch.Count > 0) + { + await ProcessBatchAsync(batch, cancellationToken); + } + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("Message processing pipeline stopped"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in message processing pipeline"); + } + } + + private async Task ProcessBatchAsync(List batch, CancellationToken cancellationToken) + { + // 这里实现具体的消息处理逻辑 + foreach (var message in batch) + { + try + { + // 处理消息 + await ProcessMessageAsync(message, cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing message: {MessageId}", message.MessageId); + } + } + } + + private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken) + { + // 实现具体的消息处理逻辑 + // 这里可以添加消息验证、转换、业务处理等步骤 + } +} + +public class WebSocketMessage +{ + public string MessageId { get; set; } = Guid.NewGuid().ToString(); + public string ConnectionId { get; set; } = string.Empty; + public string MessageType { get; set; } = string.Empty; + public byte[] Payload { get; set; } = Array.Empty(); + public DateTime Timestamp { get; set; } = DateTime.UtcNow; +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs new file mode 100644 index 0000000..16e8c6e --- /dev/null +++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs @@ -0,0 +1,41 @@ +using System.Collections.Concurrent; +using CellularManagement.Application.Pipeline; + +namespace CellularManagement.Infrastructure.WebSocket; + +public class WebSocketMessagePool +{ + private readonly ConcurrentBag _pool = new(); + private readonly int _maxPoolSize; + + public WebSocketMessagePool(int maxPoolSize = 1000) + { + _maxPoolSize = maxPoolSize; + } + + public WebSocketMessageContext Rent() + { + if (_pool.TryTake(out var context)) + { + return context; + } + + return new WebSocketMessageContext(); + } + + public void Return(WebSocketMessageContext context) + { + if (_pool.Count < _maxPoolSize) + { + // 重置上下文状态 + context.ConnectionId = string.Empty; + context.MessageType = string.Empty; + context.Payload = Array.Empty(); + context.Properties.Clear(); + context.IsHandled = false; + context.Error = null; + + _pool.Add(context); + } + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs new file mode 100644 index 0000000..6b24701 --- /dev/null +++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs @@ -0,0 +1,89 @@ +using System.Net.WebSockets; +using System.Text; +using Microsoft.Extensions.Logging; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.DependencyInjection; +using CellularManagement.Application.Services; + +namespace CellularManagement.Infrastructure.WebSocket; + +public class WebSocketMiddleware +{ + private readonly RequestDelegate _next; + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + + public WebSocketMiddleware( + RequestDelegate next, + ILogger logger, + IServiceProvider serviceProvider) + { + _next = next; + _logger = logger; + _serviceProvider = serviceProvider; + } + + public async Task InvokeAsync(HttpContext context) + { + if (context.WebSockets.IsWebSocketRequest) + { + using var scope = _serviceProvider.CreateScope(); + var webSocketService = scope.ServiceProvider.GetRequiredService(); + + var webSocket = await context.WebSockets.AcceptWebSocketAsync(); + var connectionId = await webSocketService.AcceptConnectionAsync(webSocket); + + try + { + await HandleWebSocketConnection(webSocket, connectionId, webSocketService); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error handling WebSocket connection: {ConnectionId}", connectionId); + } + finally + { + await webSocketService.CloseConnectionAsync(connectionId); + } + } + else + { + await _next(context); + } + } + + private async Task HandleWebSocketConnection( + System.Net.WebSockets.WebSocket webSocket, + string connectionId, + IWebSocketService webSocketService) + { + var buffer = new byte[1024 * 4]; + var receiveResult = await webSocket.ReceiveAsync( + new ArraySegment(buffer), CancellationToken.None); + + while (!receiveResult.CloseStatus.HasValue) + { + try + { + var message = buffer.Take(receiveResult.Count).ToArray(); + await webSocketService.SendMessageAsync(connectionId, message); + + receiveResult = await webSocket.ReceiveAsync( + new ArraySegment(buffer), CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing WebSocket message for connection: {ConnectionId}", connectionId); + break; + } + } + + if (receiveResult.CloseStatus.HasValue) + { + await webSocket.CloseAsync( + receiveResult.CloseStatus.Value, + receiveResult.CloseStatusDescription, + CancellationToken.None); + } + } +} \ No newline at end of file diff --git a/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs b/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs new file mode 100644 index 0000000..447dd92 --- /dev/null +++ b/src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs @@ -0,0 +1,295 @@ +using System.Net.WebSockets; +using System.Text; +using System.Text.Json; +using CellularManagement.Application.Services; +using CellularManagement.Domain.Entities; +using CellularManagement.Infrastructure.Monitoring; +using CellularManagement.Infrastructure.Pooling; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Caching.Distributed; +using Microsoft.Extensions.Caching.Memory; + +namespace CellularManagement.Infrastructure.WebSocket; + +public class WebSocketService : IWebSocketService +{ + private readonly ILogger _logger; + private readonly WebSocketMetrics _metrics; + private readonly WebSocketMessagePool _messagePool; + private readonly IDistributedCache _distributedCache; + private readonly ICacheService _cacheService; + private readonly string _nodeId; + private const string CONNECTION_PREFIX = "ws_connection_"; + private const string WEBSOCKET_PREFIX = "ws_socket_"; + private const string USER_PREFIX = "ws_user_"; + private const string NODE_PREFIX = "ws_node_"; + + public WebSocketService( + ILogger logger, + WebSocketMetrics metrics, + WebSocketMessagePool messagePool, + IDistributedCache distributedCache, + ICacheService cacheService) + { + _logger = logger; + _metrics = metrics; + _messagePool = messagePool; + _distributedCache = distributedCache; + _cacheService = cacheService; + _nodeId = Guid.NewGuid().ToString(); + } + + public async Task AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket) + { + var connectionId = Guid.NewGuid().ToString(); + var connection = WebSocketConnection.Create(connectionId); + + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + + await _distributedCache.SetStringAsync( + connectionKey, + JsonSerializer.Serialize(connection), + new DistributedCacheEntryOptions + { + SlidingExpiration = TimeSpan.FromMinutes(30) + }); + + _cacheService.Set(webSocketKey, webSocket, new MemoryCacheEntryOptions + { + SlidingExpiration = TimeSpan.FromMinutes(30) + }); + + await AddConnectionToNodeAsync(connectionId); + _metrics.ConnectionEstablished(); + _logger.LogInformation("WebSocket connection accepted: {ConnectionId} on node {NodeId}", + connectionId, _nodeId); + + return connectionId; + } + + public async Task CloseConnectionAsync(string connectionId) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; + var userKey = $"{USER_PREFIX}{connectionId}"; + + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket)) + { + try + { + if (connectionJson != null) + { + var connection = JsonSerializer.Deserialize(connectionJson); + if (connection != null && webSocket != null) + { + try + { + if (webSocket.State == WebSocketState.Open || webSocket.State == WebSocketState.CloseReceived) + { + await webSocket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + "Connection closed by server", + CancellationToken.None); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error closing WebSocket connection: {ConnectionId}", connectionId); + } + + connection.Close(); + await _distributedCache.SetStringAsync(connectionKey, JsonSerializer.Serialize(connection)); + } + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to deserialize WebSocketConnection for connection: {ConnectionId}, continuing with cleanup", connectionId); + } + } + + await RemoveConnectionFromNodeAsync(connectionId); + await _distributedCache.RemoveAsync(connectionKey); + _cacheService.Remove(webSocketKey); + await _distributedCache.RemoveAsync(userKey); + + _metrics.ConnectionClosed(); + _logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId); + return true; + } + + public async Task SendMessageAsync(string connectionId, byte[] message) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}"; + + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket)) + { + try + { + if (connectionJson != null) + { + var connection = JsonSerializer.Deserialize(connectionJson); + if (connection?.State == WebSocketState.Open && webSocket != null) + { + try + { + await webSocket.SendAsync( + new ArraySegment(message), + WebSocketMessageType.Text, + true, + CancellationToken.None); + + _metrics.MessageProcessed(TimeSpan.Zero); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error sending message to connection: {ConnectionId}", connectionId); + _metrics.ErrorOccurred("SendMessage"); + } + } + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to deserialize WebSocketConnection for connection: {ConnectionId}, skipping message send", connectionId); + } + } + return false; + } + + public async Task BroadcastMessageAsync(byte[] message) + { + var nodes = await GetAllNodesAsync(); + var success = true; + + foreach (var node in nodes) + { + var nodeKey = $"{NODE_PREFIX}{node}"; + var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); + if (connectionsJson != null) + { + var connections = JsonSerializer.Deserialize>(connectionsJson); + foreach (var connectionId in connections) + { + if (!await SendMessageAsync(connectionId, message)) + { + success = false; + } + } + } + } + + return success; + } + + public async Task SendMessageToUserAsync(string userId, byte[] message) + { + var userConnections = await GetUserConnectionsAsync(userId); + var success = true; + + foreach (var connection in userConnections) + { + if (!await SendMessageAsync(connection.ConnectionId, message)) + { + success = false; + } + } + + return success; + } + + public async Task AssociateUserAsync(string connectionId, string userId) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var userKey = $"{USER_PREFIX}{connectionId}"; + + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + if (connectionJson != null) + { + var connection = JsonSerializer.Deserialize(connectionJson); + if (connection != null) + { + connection.AssociateUser(userId); + await _distributedCache.SetStringAsync(connectionKey, JsonSerializer.Serialize(connection)); + await _distributedCache.SetStringAsync(userKey, userId); + } + } + } + + public async Task GetConnectionAsync(string connectionId) + { + var connectionKey = $"{CONNECTION_PREFIX}{connectionId}"; + var connectionJson = await _distributedCache.GetStringAsync(connectionKey); + return connectionJson != null + ? JsonSerializer.Deserialize(connectionJson) + : null; + } + + public async Task> GetUserConnectionsAsync(string userId) + { + var connections = new List(); + var nodes = await GetAllNodesAsync(); + + foreach (var node in nodes) + { + var nodeKey = $"{NODE_PREFIX}{node}"; + var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); + if (connectionsJson != null) + { + var connectionIds = JsonSerializer.Deserialize>(connectionsJson); + foreach (var connectionId in connectionIds) + { + var connection = await GetConnectionAsync(connectionId); + if (connection?.UserId == userId) + { + connections.Add(connection); + } + } + } + } + + return connections; + } + + private async Task AddConnectionToNodeAsync(string connectionId) + { + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + var connections = await GetNodeConnectionsAsync(); + connections.Add(connectionId); + await _distributedCache.SetStringAsync( + nodeKey, + JsonSerializer.Serialize(connections)); + } + + private async Task RemoveConnectionFromNodeAsync(string connectionId) + { + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + var connections = await GetNodeConnectionsAsync(); + connections.Remove(connectionId); + await _distributedCache.SetStringAsync( + nodeKey, + JsonSerializer.Serialize(connections)); + } + + private async Task> GetNodeConnectionsAsync() + { + var nodeKey = $"{NODE_PREFIX}{_nodeId}"; + var connectionsJson = await _distributedCache.GetStringAsync(nodeKey); + return connectionsJson != null + ? JsonSerializer.Deserialize>(connectionsJson) ?? new List() + : new List(); + } + + private async Task> GetAllNodesAsync() + { + // 这里需要实现服务发现机制 + // 可以使用Redis的Pub/Sub或其他服务发现机制 + return new List { _nodeId }; + } +} \ No newline at end of file diff --git a/src/CellularManagement.WebAPI/CellularManagement.WebAPI.csproj b/src/CellularManagement.WebAPI/CellularManagement.WebAPI.csproj index de863f2..96145ca 100644 --- a/src/CellularManagement.WebAPI/CellularManagement.WebAPI.csproj +++ b/src/CellularManagement.WebAPI/CellularManagement.WebAPI.csproj @@ -14,7 +14,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all - + @@ -22,4 +22,10 @@ + + + PreserveNewest + + + diff --git a/src/CellularManagement.WebAPI/Program.cs b/src/CellularManagement.WebAPI/Program.cs index 3be00bf..6575fce 100644 --- a/src/CellularManagement.WebAPI/Program.cs +++ b/src/CellularManagement.WebAPI/Program.cs @@ -6,6 +6,11 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using CellularManagement.Infrastructure.Configurations; using CellularManagement.Application; +using CellularManagement.Infrastructure.WebSocket; +using CellularManagement.Infrastructure.Monitoring; +using CellularManagement.Infrastructure.Pooling; +using CellularManagement.Application.Services; +using Microsoft.Extensions.Options; // 创建 Web 应用程序构建器 var builder = WebApplication.CreateBuilder(args); @@ -44,6 +49,17 @@ builder.Services.AddCors(); // 配置认证服务 // 从配置文件中读取认证相关配置 builder.Services.Configure(builder.Configuration.GetSection("Auth")); +builder.Services.Configure(builder.Configuration.GetSection(WebSocketConfiguration.SectionName)); + +// 注册WebSocket相关服务 +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddScoped(); + +// 添加静态文件服务 +builder.Services.AddDirectoryBrowser(); // 构建应用程序 var app = builder.Build(); @@ -70,6 +86,19 @@ app.UseCors(x => x.AllowAnyHeader().SetIsOriginAllowed(p => true).AllowAnyMethod // 处理用户认证和授权 app.UseAuthorization(); +// 配置 WebSocket 中间件 +var webSocketConfig = app.Services.GetRequiredService>().Value; +app.UseWebSockets(new Microsoft.AspNetCore.Builder.WebSocketOptions +{ + KeepAliveInterval = TimeSpan.FromMinutes(2) +}); + +app.UseMiddleware(); + +// 配置静态文件中间件 +app.UseStaticFiles(); +app.UseDirectoryBrowser(); + // 映射控制器路由 app.MapControllers(); diff --git a/src/CellularManagement.WebAPI/appsettings.json b/src/CellularManagement.WebAPI/appsettings.json index f0cb51f..62af76b 100644 --- a/src/CellularManagement.WebAPI/appsettings.json +++ b/src/CellularManagement.WebAPI/appsettings.json @@ -34,5 +34,9 @@ "DefaultUserRole": "User", "AccessTokenExpirationMinutes": 60, "RefreshTokenExpirationDays": 7 + }, + "WebSocket": { + "Port": 5202, + "Path": "/ws" } } diff --git a/src/CellularManagement.WebAPI/wwwroot/websocket.html b/src/CellularManagement.WebAPI/wwwroot/websocket.html new file mode 100644 index 0000000..85a166d --- /dev/null +++ b/src/CellularManagement.WebAPI/wwwroot/websocket.html @@ -0,0 +1,382 @@ + + + + + + WebSocket 3D 聊天客户端 + + + + + + +
+ + +
+ +
+
+ +
+ +
+
+
+
+
+ 未连接 +
+
+
+ + +
+
+
+
+ + +
+
+
+

消息记录

+ +
+
+ +
+
+
+ + +
+
+
+ + +
+
+
+
+ + + + \ No newline at end of file