Browse Source

Initial commit on ws branch

ws
hyh 3 months ago
parent
commit
aec7c755bc
  1. 28
      src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs
  2. 16
      src/CellularManagement.Application/Services/IWebSocketService.cs
  3. 137
      src/CellularManagement.Application/Services/WebSocketService.cs
  4. 6
      src/CellularManagement.Domain/Common/IAggregateRoot.cs
  5. 66
      src/CellularManagement.Domain/Entities/WebSocketConnection.cs
  6. 35
      src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs
  7. 9
      src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs
  8. 9
      src/CellularManagement.Infrastructure/DependencyInjection.cs
  9. 196
      src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs
  10. 65
      src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs
  11. 54
      src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs
  12. 145
      src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs
  13. 59
      src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs
  14. 129
      src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs
  15. 199
      src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs
  16. 72
      src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs
  17. 134
      src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs
  18. 420
      src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs
  19. 13
      src/CellularManagement.WebAPI/CellularManagement.WebAPI.http
  20. 26
      src/CellularManagement.WebAPI/Program.cs
  21. 5
      src/CellularManagement.WebAPI/appsettings.README.md
  22. 4
      src/CellularManagement.WebAPI/appsettings.json
  23. 26
      src/CellularManagement.WebAPI/wwwroot/websocket.html

28
src/CellularManagement.Application/Pipeline/IWebSocketMessageHandler.cs

@ -1,28 +0,0 @@
using System.Threading.Channels;
namespace CellularManagement.Application.Pipeline;
public interface IWebSocketMessageHandler
{
string HandlerName { get; }
int Priority { get; }
bool CanHandle(string messageType);
Task HandleAsync(WebSocketMessageContext context, CancellationToken cancellationToken);
}
public class WebSocketMessageContext
{
public string ConnectionId { get; set; } = string.Empty;
public string MessageType { get; set; } = string.Empty;
public byte[] Payload { get; set; } = Array.Empty<byte>();
public Dictionary<string, object> Properties { get; } = new();
public bool IsHandled { get; set; }
public Exception? Error { get; set; }
}
public interface IWebSocketMessagePipeline
{
Task ProcessMessageAsync(WebSocketMessageContext context, CancellationToken cancellationToken);
void RegisterHandler(IWebSocketMessageHandler handler);
void UnregisterHandler(string handlerName);
}

16
src/CellularManagement.Application/Services/IWebSocketService.cs

@ -1,16 +0,0 @@
using System.Net.WebSockets;
using CellularManagement.Domain.Entities;
namespace CellularManagement.Application.Services;
public interface IWebSocketService
{
Task<string> AcceptConnectionAsync(WebSocket webSocket);
Task<bool> CloseConnectionAsync(string connectionId);
Task<bool> SendMessageAsync(string connectionId, byte[] message);
Task<bool> BroadcastMessageAsync(byte[] message);
Task<bool> SendMessageToUserAsync(string userId, byte[] message);
Task AssociateUserAsync(string connectionId, string userId);
Task<WebSocketConnection?> GetConnectionAsync(string connectionId);
Task<IEnumerable<WebSocketConnection>> GetUserConnectionsAsync(string userId);
}

137
src/CellularManagement.Application/Services/WebSocketService.cs

@ -1,137 +0,0 @@
using System.Net.WebSockets;
using CellularManagement.Domain.Entities;
using CellularManagement.Application.Services;
using CellularManagement.Application.Pipeline;
using Microsoft.Extensions.Logging;
using MediatR;
using CellularManagement.Domain.Events;
namespace CellularManagement.Application.Services;
public class WebSocketService : IWebSocketService
{
private readonly ICacheService _cacheService;
private readonly ILogger<WebSocketService> _logger;
private readonly IWebSocketMessagePipeline _messagePipeline;
private readonly IMediator _mediator;
private const string CONNECTION_PREFIX = "ws_connection_";
private const string USER_CONNECTION_PREFIX = "ws_user_";
public WebSocketService(
ICacheService cacheService,
ILogger<WebSocketService> logger,
IWebSocketMessagePipeline messagePipeline,
IMediator mediator)
{
_cacheService = cacheService;
_logger = logger;
_messagePipeline = messagePipeline;
_mediator = mediator;
}
public async Task<string> AcceptConnectionAsync(WebSocket webSocket)
{
var connectionId = Guid.NewGuid().ToString();
var connection = WebSocketConnection.Create(connectionId);
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
_cacheService.Set(connectionKey, webSocket);
_cacheService.Set(connectionId, connection);
_logger.LogInformation("WebSocket connection accepted: {ConnectionId}", connectionId);
return connectionId;
}
public async Task<bool> CloseConnectionAsync(string connectionId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var userKey = $"{USER_CONNECTION_PREFIX}{connectionId}";
var connection = _cacheService.Get<WebSocketConnection>(connectionId);
if (connection != null)
{
connection.Close();
_cacheService.Set(connectionId, connection);
}
_cacheService.Remove(connectionKey);
_cacheService.Remove(userKey);
_logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId);
return true;
}
public async Task<bool> SendMessageAsync(string connectionId, byte[] message)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocket = _cacheService.Get<WebSocket>(connectionKey);
if (webSocket?.State == WebSocketState.Open)
{
await webSocket.SendAsync(
new ArraySegment<byte>(message),
WebSocketMessageType.Text,
true,
CancellationToken.None);
await _mediator.Publish(new WebSocketMessageSentEvent(
connectionId,
"text",
message));
return true;
}
return false;
}
public async Task<bool> BroadcastMessageAsync(byte[] message)
{
// 注意:这里需要实现广播逻辑
// 由于ICacheService的限制,可能需要使用其他方式实现
return false;
}
public async Task<bool> SendMessageToUserAsync(string userId, byte[] message)
{
// 注意:这里需要实现向特定用户发送消息的逻辑
// 由于ICacheService的限制,可能需要使用其他方式实现
return false;
}
public async Task AssociateUserAsync(string connectionId, string userId)
{
var connection = _cacheService.Get<WebSocketConnection>(connectionId);
if (connection != null)
{
connection.AssociateUser(userId);
_cacheService.Set(connectionId, connection);
var userKey = $"{USER_CONNECTION_PREFIX}{connectionId}";
_cacheService.Set(userKey, userId);
}
}
public async Task<WebSocketConnection?> GetConnectionAsync(string connectionId)
{
return _cacheService.Get<WebSocketConnection>(connectionId);
}
public async Task<IEnumerable<WebSocketConnection>> GetUserConnectionsAsync(string userId)
{
// 注意:这里需要实现获取用户所有连接的逻辑
// 由于ICacheService的限制,可能需要使用其他方式实现
return Enumerable.Empty<WebSocketConnection>();
}
public async Task ProcessMessageAsync(string connectionId, string messageType, byte[] payload)
{
var context = new WebSocketMessageContext
{
ConnectionId = connectionId,
MessageType = messageType,
Payload = payload
};
await _messagePipeline.ProcessMessageAsync(context, CancellationToken.None);
}
}

6
src/CellularManagement.Domain/Common/IAggregateRoot.cs

@ -0,0 +1,6 @@
namespace CellularManagement.Domain.Common;
public interface IAggregateRoot
{
// 聚合根接口,用于标识领域模型中的聚合根
}

66
src/CellularManagement.Domain/Entities/WebSocketConnection.cs

@ -1,66 +0,0 @@
using System.Net.WebSockets;
using System.Text.Json.Serialization;
namespace CellularManagement.Domain.Entities;
public class WebSocketConnection
{
[JsonPropertyName("connectionId")]
public string ConnectionId { get; set; }
[JsonPropertyName("userId")]
public string? UserId { get; set; }
[JsonPropertyName("connectedAt")]
public DateTime ConnectedAt { get; set; }
[JsonPropertyName("lastActivityAt")]
public DateTime? LastActivityAt { get; set; }
[JsonPropertyName("state")]
public WebSocketState State { get; set; }
// 添加无参构造函数用于反序列化
public WebSocketConnection()
{
ConnectionId = string.Empty;
ConnectedAt = DateTime.UtcNow;
State = WebSocketState.Open;
}
// 添加带参数的构造函数
[JsonConstructor]
public WebSocketConnection(string connectionId, string? userId, DateTime connectedAt, DateTime? lastActivityAt, WebSocketState state)
{
ConnectionId = connectionId;
UserId = userId;
ConnectedAt = connectedAt;
LastActivityAt = lastActivityAt;
State = state;
}
public static WebSocketConnection Create(string connectionId)
{
return new WebSocketConnection
{
ConnectionId = connectionId,
ConnectedAt = DateTime.UtcNow,
State = WebSocketState.Open
};
}
public void AssociateUser(string userId)
{
UserId = userId;
}
public void UpdateLastActivity()
{
LastActivityAt = DateTime.UtcNow;
}
public void Close()
{
State = WebSocketState.Closed;
}
}

35
src/CellularManagement.Domain/Events/WebSocketMessageEvent.cs

@ -1,35 +0,0 @@
using MediatR;
namespace CellularManagement.Domain.Events;
public class WebSocketMessageReceivedEvent : INotification
{
public string ConnectionId { get; }
public string MessageType { get; }
public byte[] Payload { get; }
public DateTime Timestamp { get; }
public WebSocketMessageReceivedEvent(string connectionId, string messageType, byte[] payload)
{
ConnectionId = connectionId;
MessageType = messageType;
Payload = payload;
Timestamp = DateTime.UtcNow;
}
}
public class WebSocketMessageSentEvent : INotification
{
public string ConnectionId { get; }
public string MessageType { get; }
public byte[] Payload { get; }
public DateTime Timestamp { get; }
public WebSocketMessageSentEvent(string connectionId, string messageType, byte[] payload)
{
ConnectionId = connectionId;
MessageType = messageType;
Payload = payload;
Timestamp = DateTime.UtcNow;
}
}

9
src/CellularManagement.Infrastructure/Configurations/WebSocketConfiguration.cs

@ -1,9 +0,0 @@
namespace CellularManagement.Infrastructure.Configurations;
public class WebSocketConfiguration
{
public const string SectionName = "WebSocket";
public int Port { get; set; } = 5202;
public string Path { get; set; } = "/ws";
}

9
src/CellularManagement.Infrastructure/DependencyInjection.cs

@ -8,8 +8,6 @@ using CellularManagement.Domain.Entities;
using CellularManagement.Domain.Repositories;
using CellularManagement.Application.Services;
using CellularManagement.Infrastructure.Services;
using CellularManagement.Infrastructure.WebSocket;
using CellularManagement.Infrastructure.Monitoring;
using Scrutor;
namespace CellularManagement.Infrastructure;
@ -119,13 +117,6 @@ public static class DependencyInjection
// 注册缓存服务
services.AddScoped<ICacheService, CacheService>();
// 注册WebSocket相关服务
services.AddSingleton<WebSocketMetrics>();
services.AddSingleton<WebSocketMessagePool>();
services.AddScoped<IWebSocketService, Infrastructure.WebSocket.WebSocketService>();
services.AddSingleton<WebSocketConnectionManager>();
services.AddSingleton<WebSocketMessagePipeline>();
// 自动注册服务
services
.Scan(action =>

196
src/CellularManagement.Infrastructure/Distributed/DistributedWebSocketManager.cs

@ -1,196 +0,0 @@
using System.Net.WebSockets;
using CellularManagement.Domain.Entities;
using CellularManagement.Application.Services;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Caching.Distributed;
using System.Text.Json;
namespace CellularManagement.Infrastructure.Distributed;
public class DistributedWebSocketManager : IWebSocketService
{
private readonly IDistributedCache _distributedCache;
private readonly ILogger<DistributedWebSocketManager> _logger;
private readonly string _nodeId;
private const string CONNECTION_PREFIX = "ws_connection_";
private const string USER_PREFIX = "ws_user_";
private const string NODE_PREFIX = "ws_node_";
public DistributedWebSocketManager(
IDistributedCache distributedCache,
ILogger<DistributedWebSocketManager> logger)
{
_distributedCache = distributedCache;
_logger = logger;
_nodeId = Guid.NewGuid().ToString();
}
public async Task<string> AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket)
{
var connectionId = Guid.NewGuid().ToString();
var connection = WebSocketConnection.Create(connectionId);
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
// 存储连接信息
await _distributedCache.SetStringAsync(
connectionKey,
JsonSerializer.Serialize(connection),
new DistributedCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromMinutes(30)
});
// 更新节点连接列表
await AddConnectionToNodeAsync(connectionId);
_logger.LogInformation("WebSocket connection accepted: {ConnectionId} on node {NodeId}",
connectionId, _nodeId);
return connectionId;
}
public async Task<bool> CloseConnectionAsync(string connectionId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var userKey = $"{USER_PREFIX}{connectionId}";
// 获取连接信息
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (connectionJson != null)
{
var connection = JsonSerializer.Deserialize<WebSocketConnection>(connectionJson);
if (connection != null)
{
connection.Close();
await _distributedCache.SetStringAsync(
connectionKey,
JsonSerializer.Serialize(connection));
}
}
// 从节点连接列表中移除
await RemoveConnectionFromNodeAsync(connectionId);
// 清理缓存
await _distributedCache.RemoveAsync(connectionKey);
await _distributedCache.RemoveAsync(userKey);
_logger.LogInformation("WebSocket connection closed: {ConnectionId}", connectionId);
return true;
}
public async Task<bool> SendMessageAsync(string connectionId, byte[] message)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (connectionJson != null)
{
var connection = JsonSerializer.Deserialize<WebSocketConnection>(connectionJson);
if (connection?.State == WebSocketState.Open)
{
// 这里需要实现消息发送逻辑
// 可能需要使用消息队列或其他机制
return true;
}
}
return false;
}
public async Task<bool> BroadcastMessageAsync(byte[] message)
{
// 获取所有节点的连接列表
var nodes = await GetAllNodesAsync();
foreach (var node in nodes)
{
// 向每个节点发送广播消息
// 这里需要实现节点间的消息传递机制
}
return true;
}
public async Task<bool> SendMessageToUserAsync(string userId, byte[] message)
{
// 获取用户的所有连接
var connections = await GetUserConnectionsAsync(userId);
foreach (var connection in connections)
{
await SendMessageAsync(connection.ConnectionId, message);
}
return true;
}
public async Task AssociateUserAsync(string connectionId, string userId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var userKey = $"{USER_PREFIX}{connectionId}";
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (connectionJson != null)
{
var connection = JsonSerializer.Deserialize<WebSocketConnection>(connectionJson);
if (connection != null)
{
connection.AssociateUser(userId);
await _distributedCache.SetStringAsync(
connectionKey,
JsonSerializer.Serialize(connection));
await _distributedCache.SetStringAsync(userKey, userId);
}
}
}
public async Task<WebSocketConnection?> GetConnectionAsync(string connectionId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
return connectionJson != null
? JsonSerializer.Deserialize<WebSocketConnection>(connectionJson)
: null;
}
public async Task<IEnumerable<WebSocketConnection>> GetUserConnectionsAsync(string userId)
{
var connections = new List<WebSocketConnection>();
// 这里需要实现获取用户所有连接的逻辑
return connections;
}
private async Task AddConnectionToNodeAsync(string connectionId)
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
var connections = await GetNodeConnectionsAsync();
connections.Add(connectionId);
await _distributedCache.SetStringAsync(
nodeKey,
JsonSerializer.Serialize(connections));
}
private async Task RemoveConnectionFromNodeAsync(string connectionId)
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
var connections = await GetNodeConnectionsAsync();
connections.Remove(connectionId);
await _distributedCache.SetStringAsync(
nodeKey,
JsonSerializer.Serialize(connections));
}
private async Task<List<string>> GetNodeConnectionsAsync()
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
var connectionsJson = await _distributedCache.GetStringAsync(nodeKey);
return connectionsJson != null
? JsonSerializer.Deserialize<List<string>>(connectionsJson) ?? new List<string>()
: new List<string>();
}
private async Task<List<string>> GetAllNodesAsync()
{
// 这里需要实现获取所有节点的逻辑
// 可能需要使用服务发现或其他机制
return new List<string>();
}
}

65
src/CellularManagement.Infrastructure/Monitoring/WebSocketMetrics.cs

@ -1,65 +0,0 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using Microsoft.Extensions.Logging;
namespace CellularManagement.Infrastructure.Monitoring;
public class WebSocketMetrics
{
private readonly ILogger<WebSocketMetrics> _logger;
private readonly Meter _meter;
private readonly Counter<int> _connectionsCounter;
private readonly Counter<int> _messagesCounter;
private readonly Histogram<double> _messageLatency;
private readonly Histogram<double> _messageSize;
private readonly Counter<int> _errorsCounter;
public WebSocketMetrics(ILogger<WebSocketMetrics> logger)
{
_logger = logger;
_meter = new Meter("CellularManagement.WebSocket", "1.0.0");
_connectionsCounter = _meter.CreateCounter<int>("websocket.connections", "个", "活跃连接数");
_messagesCounter = _meter.CreateCounter<int>("websocket.messages", "个", "消息处理数");
_messageLatency = _meter.CreateHistogram<double>("websocket.message.latency", "ms", "消息处理延迟");
_messageSize = _meter.CreateHistogram<double>("websocket.message.size", "bytes", "消息大小");
_errorsCounter = _meter.CreateCounter<int>("websocket.errors", "个", "错误数");
}
public void ConnectionEstablished()
{
_connectionsCounter.Add(1);
_logger.LogInformation("WebSocket connection established");
}
public void ConnectionClosed()
{
_connectionsCounter.Add(-1);
_logger.LogInformation("WebSocket connection closed");
}
public void MessageReceived(int size)
{
_messagesCounter.Add(1);
_messageSize.Record(size);
_logger.LogDebug("WebSocket message received, size: {Size} bytes", size);
}
public void MessageProcessed(TimeSpan latency)
{
_messageLatency.Record(latency.TotalMilliseconds);
_logger.LogDebug("WebSocket message processed, latency: {Latency}ms", latency.TotalMilliseconds);
}
public void ErrorOccurred(string errorType)
{
_errorsCounter.Add(1);
_logger.LogError("WebSocket error occurred: {ErrorType}", errorType);
}
public void RecordGauge(string name, double value)
{
var gauge = _meter.CreateObservableGauge<double>($"websocket.{name}", () => value);
_logger.LogDebug("WebSocket gauge recorded: {Name} = {Value}", name, value);
}
}

54
src/CellularManagement.Infrastructure/Pipeline/Handlers/WebSocketMessageValidationHandler.cs

@ -1,54 +0,0 @@
using CellularManagement.Application.Pipeline;
using Microsoft.Extensions.Logging;
namespace CellularManagement.Infrastructure.Pipeline.Handlers;
public class WebSocketMessageValidationHandler : IWebSocketMessageHandler
{
private readonly ILogger<WebSocketMessageValidationHandler> _logger;
public string HandlerName => "MessageValidationHandler";
public int Priority => 100;
public WebSocketMessageValidationHandler(ILogger<WebSocketMessageValidationHandler> logger)
{
_logger = logger;
}
public bool CanHandle(string messageType)
{
return true; // 处理所有消息类型
}
public async Task HandleAsync(WebSocketMessageContext context, CancellationToken cancellationToken)
{
try
{
// 验证消息格式
if (string.IsNullOrEmpty(context.MessageType))
{
throw new ArgumentException("Message type cannot be empty");
}
if (context.Payload == null || context.Payload.Length == 0)
{
throw new ArgumentException("Message payload cannot be empty");
}
// 验证消息大小
if (context.Payload.Length > 1024 * 1024) // 1MB
{
throw new ArgumentException("Message payload too large");
}
_logger.LogInformation("Message validation passed for connection: {ConnectionId}", context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Message validation failed for connection: {ConnectionId}", context.ConnectionId);
context.Error = ex;
context.IsHandled = true;
throw;
}
}
}

145
src/CellularManagement.Infrastructure/Pipeline/WebSocketMessagePipeline.cs

@ -1,145 +0,0 @@
using System.Threading.Channels;
using CellularManagement.Application.Pipeline;
using Microsoft.Extensions.Logging;
using MediatR;
using CellularManagement.Domain.Events;
namespace CellularManagement.Infrastructure.Pipeline;
public class WebSocketMessagePipeline : IWebSocketMessagePipeline
{
private readonly Channel<WebSocketMessageContext> _messageChannel;
private readonly List<IWebSocketMessageHandler> _handlers = new();
private readonly ILogger<WebSocketMessagePipeline> _logger;
private readonly IMediator _mediator;
private readonly int _batchSize;
private readonly int _maxQueueSize;
public WebSocketMessagePipeline(
ILogger<WebSocketMessagePipeline> logger,
IMediator mediator,
int batchSize = 100,
int maxQueueSize = 10000)
{
_logger = logger;
_mediator = mediator;
_batchSize = batchSize;
_maxQueueSize = maxQueueSize;
var options = new BoundedChannelOptions(maxQueueSize)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = false,
SingleReader = false
};
_messageChannel = Channel.CreateBounded<WebSocketMessageContext>(options);
}
public async Task ProcessMessageAsync(WebSocketMessageContext context, CancellationToken cancellationToken)
{
try
{
await _messageChannel.Writer.WriteAsync(context, cancellationToken);
await _mediator.Publish(new WebSocketMessageReceivedEvent(
context.ConnectionId,
context.MessageType,
context.Payload), cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message for connection: {ConnectionId}", context.ConnectionId);
throw;
}
}
public void RegisterHandler(IWebSocketMessageHandler handler)
{
_handlers.Add(handler);
_handlers.Sort((a, b) => b.Priority.CompareTo(a.Priority));
}
public void UnregisterHandler(string handlerName)
{
_handlers.RemoveAll(h => h.HandlerName == handlerName);
}
public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var batch = new List<WebSocketMessageContext>(_batchSize);
try
{
while (!cancellationToken.IsCancellationRequested)
{
batch.Clear();
while (batch.Count < _batchSize &&
await _messageChannel.Reader.WaitToReadAsync(cancellationToken))
{
if (_messageChannel.Reader.TryRead(out var message))
{
batch.Add(message);
}
}
if (batch.Count > 0)
{
await ProcessBatchAsync(batch, cancellationToken);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Message processing pipeline stopped");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in message processing pipeline");
}
}
private async Task ProcessBatchAsync(List<WebSocketMessageContext> batch, CancellationToken cancellationToken)
{
foreach (var message in batch)
{
try
{
await ProcessMessageWithHandlersAsync(message, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message: {MessageId}", message.ConnectionId);
message.Error = ex;
}
}
}
private async Task ProcessMessageWithHandlersAsync(WebSocketMessageContext context, CancellationToken cancellationToken)
{
foreach (var handler in _handlers)
{
if (handler.CanHandle(context.MessageType) && !context.IsHandled)
{
try
{
await handler.HandleAsync(context, cancellationToken);
if (context.IsHandled)
{
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Handler {HandlerName} failed to process message", handler.HandlerName);
throw;
}
}
}
if (!context.IsHandled)
{
_logger.LogWarning("No handler found for message type: {MessageType}", context.MessageType);
}
}
}

59
src/CellularManagement.Infrastructure/Pooling/WebSocketMessagePool.cs

@ -1,59 +0,0 @@
using System.Buffers;
using System.Collections.Concurrent;
using CellularManagement.Application.Pipeline;
namespace CellularManagement.Infrastructure.Pooling;
public class WebSocketMessagePool
{
private readonly ConcurrentBag<WebSocketMessageContext> _pool = new();
private readonly ArrayPool<byte> _arrayPool = ArrayPool<byte>.Shared;
private readonly int _maxPoolSize;
private readonly int _bufferSize;
public WebSocketMessagePool(int maxPoolSize = 1000, int bufferSize = 1024 * 4)
{
_maxPoolSize = maxPoolSize;
_bufferSize = bufferSize;
}
public WebSocketMessageContext Rent()
{
if (_pool.TryTake(out var context))
{
return context;
}
return new WebSocketMessageContext
{
Payload = _arrayPool.Rent(_bufferSize)
};
}
public void Return(WebSocketMessageContext context)
{
if (_pool.Count < _maxPoolSize)
{
context.ConnectionId = string.Empty;
context.MessageType = string.Empty;
context.Properties.Clear();
context.IsHandled = false;
context.Error = null;
if (context.Payload != null)
{
_arrayPool.Return(context.Payload);
context.Payload = _arrayPool.Rent(_bufferSize);
}
_pool.Add(context);
}
else
{
if (context.Payload != null)
{
_arrayPool.Return(context.Payload);
}
}
}
}

129
src/CellularManagement.Infrastructure/WebSocket/WebSocketConnectionManager.cs

@ -1,129 +0,0 @@
using System.Net.WebSockets;
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using CellularManagement.Application.Services;
namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket连接管理器
/// 负责管理WebSocket连接的创建、删除和查询
/// 使用线程安全的并发集合存储连接信息
/// </summary>
public class WebSocketConnectionManager
{
/// <summary>
/// 日志记录器,用于记录连接管理器的操作日志
/// </summary>
private readonly ILogger<WebSocketConnectionManager> _logger;
/// <summary>
/// WebSocket连接字典
/// 键为连接ID,值为WebSocket实例
/// 使用线程安全的ConcurrentDictionary
/// </summary>
private readonly ConcurrentDictionary<string, System.Net.WebSockets.WebSocket> _sockets = new();
/// <summary>
/// 用户连接关联字典
/// 键为连接ID,值为用户ID
/// 使用线程安全的ConcurrentDictionary
/// </summary>
private readonly ConcurrentDictionary<string, string> _userConnections = new();
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器,用于记录连接管理器的操作日志</param>
public WebSocketConnectionManager(ILogger<WebSocketConnectionManager> logger)
{
_logger = logger;
}
/// <summary>
/// 添加新的WebSocket连接
/// 生成唯一的连接ID并存储连接信息
/// </summary>
/// <param name="socket">要添加的WebSocket实例</param>
/// <returns>新创建的连接ID</returns>
public string AddConnection(System.Net.WebSockets.WebSocket socket)
{
var connectionId = Guid.NewGuid().ToString();
_sockets.TryAdd(connectionId, socket);
_logger.LogInformation("WebSocket连接已添加: {ConnectionId}", connectionId);
return connectionId;
}
/// <summary>
/// 移除WebSocket连接
/// 清理连接相关的所有信息
/// </summary>
/// <param name="connectionId">要移除的连接ID</param>
/// <returns>是否成功移除连接</returns>
public bool RemoveConnection(string connectionId)
{
_sockets.TryRemove(connectionId, out _);
_userConnections.TryRemove(connectionId, out _);
_logger.LogInformation("WebSocket连接已移除: {ConnectionId}", connectionId);
return true;
}
/// <summary>
/// 获取指定ID的WebSocket连接
/// </summary>
/// <param name="connectionId">要查询的连接ID</param>
/// <returns>WebSocket实例,如果不存在则返回null</returns>
public System.Net.WebSockets.WebSocket? GetConnection(string connectionId)
{
_sockets.TryGetValue(connectionId, out var socket);
return socket;
}
/// <summary>
/// 获取所有WebSocket连接
/// </summary>
/// <returns>所有WebSocket实例的集合</returns>
public IEnumerable<System.Net.WebSockets.WebSocket> GetAllConnections()
{
return _sockets.Values;
}
/// <summary>
/// 将连接与用户关联
/// 建立连接ID和用户ID的映射关系
/// </summary>
/// <param name="connectionId">要关联的连接ID</param>
/// <param name="userId">要关联的用户ID</param>
public void AssociateUser(string connectionId, string userId)
{
_userConnections.TryAdd(connectionId, userId);
}
/// <summary>
/// 获取连接关联的用户ID
/// </summary>
/// <param name="connectionId">要查询的连接ID</param>
/// <returns>用户ID,如果未关联则返回null</returns>
public string? GetUserId(string connectionId)
{
_userConnections.TryGetValue(connectionId, out var userId);
return userId;
}
/// <summary>
/// 获取用户的所有WebSocket连接
/// 根据用户ID查找所有关联的连接
/// </summary>
/// <param name="userId">要查询的用户ID</param>
/// <returns>用户的所有WebSocket实例的集合</returns>
public IEnumerable<System.Net.WebSockets.WebSocket> GetUserConnections(string userId)
{
var connections = _userConnections
.Where(kvp => kvp.Value == userId)
.Select(kvp => kvp.Key)
.Select(connectionId => GetConnection(connectionId))
.Where(socket => socket != null);
return connections!;
}
}

199
src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePipeline.cs

@ -1,199 +0,0 @@
using System.Threading.Channels;
using System.Buffers;
using Microsoft.Extensions.Logging;
namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket消息处理管道
/// 负责批量处理和分发WebSocket消息
/// 使用Channel实现消息队列,支持批量处理提高性能
/// </summary>
public class WebSocketMessagePipeline
{
/// <summary>
/// 消息通道,用于存储待处理的消息
/// 使用有界通道限制内存使用
/// </summary>
private readonly Channel<WebSocketMessage> _messageChannel;
/// <summary>
/// 日志记录器,用于记录管道的操作日志
/// </summary>
private readonly ILogger<WebSocketMessagePipeline> _logger;
/// <summary>
/// 内存池,用于复用内存缓冲区
/// </summary>
private readonly MemoryPool<byte> _memoryPool;
/// <summary>
/// 批处理大小,每次处理的消息数量
/// </summary>
private readonly int _batchSize;
/// <summary>
/// 最大队列大小,限制通道中可存储的消息数量
/// </summary>
private readonly int _maxQueueSize;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器,用于记录管道的操作日志</param>
/// <param name="batchSize">批处理大小,每次处理的消息数量,默认100</param>
/// <param name="maxQueueSize">最大队列大小,限制通道中可存储的消息数量,默认10000</param>
public WebSocketMessagePipeline(
ILogger<WebSocketMessagePipeline> logger,
int batchSize = 100,
int maxQueueSize = 10000)
{
_logger = logger;
_batchSize = batchSize;
_maxQueueSize = maxQueueSize;
_memoryPool = MemoryPool<byte>.Shared;
// 创建有界通道
var options = new BoundedChannelOptions(maxQueueSize)
{
FullMode = BoundedChannelFullMode.Wait, // 队列满时等待
SingleWriter = false, // 允许多个写入者
SingleReader = false // 允许多个读取者
};
_messageChannel = Channel.CreateBounded<WebSocketMessage>(options);
}
/// <summary>
/// 写入消息到管道
/// 将消息添加到通道中等待处理
/// </summary>
/// <param name="message">要处理的WebSocket消息</param>
/// <returns>是否成功写入消息</returns>
public async ValueTask<bool> WriteMessageAsync(WebSocketMessage message)
{
try
{
await _messageChannel.Writer.WriteAsync(message);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "写入消息到管道失败");
return false;
}
}
/// <summary>
/// 处理消息
/// 循环从通道中读取消息并批量处理
/// </summary>
/// <param name="cancellationToken">取消令牌,用于停止处理</param>
public async Task ProcessMessagesAsync(CancellationToken cancellationToken)
{
var batch = new List<WebSocketMessage>(_batchSize);
try
{
while (!cancellationToken.IsCancellationRequested)
{
batch.Clear();
// 批量读取消息
while (batch.Count < _batchSize &&
await _messageChannel.Reader.WaitToReadAsync(cancellationToken))
{
if (_messageChannel.Reader.TryRead(out var message))
{
batch.Add(message);
}
}
if (batch.Count > 0)
{
await ProcessBatchAsync(batch, cancellationToken);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("消息处理管道已停止");
}
catch (Exception ex)
{
_logger.LogError(ex, "消息处理管道出错");
}
}
/// <summary>
/// 批量处理消息
/// 处理一批消息,逐个处理并记录错误
/// </summary>
/// <param name="batch">要处理的消息批次</param>
/// <param name="cancellationToken">取消令牌,用于停止处理</param>
private async Task ProcessBatchAsync(List<WebSocketMessage> batch, CancellationToken cancellationToken)
{
// 这里实现具体的消息处理逻辑
foreach (var message in batch)
{
try
{
// 处理消息
await ProcessMessageAsync(message, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时出错: {MessageId}", message.MessageId);
}
}
}
/// <summary>
/// 处理单个消息
/// 实现具体的消息处理逻辑
/// </summary>
/// <param name="message">要处理的WebSocket消息</param>
/// <param name="cancellationToken">取消令牌,用于停止处理</param>
private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
// 实现具体的消息处理逻辑
// 这里可以添加消息验证、转换、业务处理等步骤
}
}
/// <summary>
/// WebSocket消息实体
/// 表示一个WebSocket消息的基本信息
/// </summary>
public class WebSocketMessage
{
/// <summary>
/// 消息ID
/// 唯一标识一条消息
/// </summary>
public string MessageId { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// 连接ID
/// 标识消息来自哪个WebSocket连接
/// </summary>
public string ConnectionId { get; set; } = string.Empty;
/// <summary>
/// 消息类型
/// 用于标识消息的类型,可用于路由和处理
/// </summary>
public string MessageType { get; set; } = string.Empty;
/// <summary>
/// 消息内容
/// 消息的实际数据
/// </summary>
public byte[] Payload { get; set; } = Array.Empty<byte>();
/// <summary>
/// 时间戳
/// 消息创建的时间
/// </summary>
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}

72
src/CellularManagement.Infrastructure/WebSocket/WebSocketMessagePool.cs

@ -1,72 +0,0 @@
using System.Collections.Concurrent;
using CellularManagement.Application.Pipeline;
namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket消息池
/// 负责管理WebSocket消息上下文的复用,减少内存分配
/// 使用对象池模式提高性能
/// </summary>
public class WebSocketMessagePool
{
/// <summary>
/// 消息上下文池
/// 使用线程安全的ConcurrentBag存储可复用的消息上下文
/// </summary>
private readonly ConcurrentBag<WebSocketMessageContext> _pool = new();
/// <summary>
/// 最大池大小
/// 限制池中可存储的消息上下文数量
/// </summary>
private readonly int _maxPoolSize;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="maxPoolSize">最大池大小,限制池中可存储的消息上下文数量,默认1000</param>
public WebSocketMessagePool(int maxPoolSize = 1000)
{
_maxPoolSize = maxPoolSize;
}
/// <summary>
/// 从池中获取消息上下文
/// 如果池中有可用的上下文则复用,否则创建新的
/// </summary>
/// <returns>可用的消息上下文</returns>
public WebSocketMessageContext Rent()
{
// 尝试从池中获取可用的上下文
if (_pool.TryTake(out var context))
{
return context;
}
// 如果池中没有可用的上下文,创建新的
return new WebSocketMessageContext();
}
/// <summary>
/// 将消息上下文返回到池中
/// 重置上下文状态并放回池中,如果池已满则丢弃
/// </summary>
/// <param name="context">要返回的消息上下文</param>
public void Return(WebSocketMessageContext context)
{
// 如果池未满,重置上下文并放回池中
if (_pool.Count < _maxPoolSize)
{
// 重置上下文状态
context.ConnectionId = string.Empty;
context.MessageType = string.Empty;
context.Payload = Array.Empty<byte>();
context.Properties.Clear();
context.IsHandled = false;
context.Error = null;
_pool.Add(context);
}
}
}

134
src/CellularManagement.Infrastructure/WebSocket/WebSocketMiddleware.cs

@ -1,134 +0,0 @@
using System.Net.WebSockets;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using CellularManagement.Application.Services;
namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket中间件
/// 负责处理WebSocket请求的生命周期和消息处理
/// 作为ASP.NET Core中间件,拦截WebSocket请求并管理连接
/// </summary>
public class WebSocketMiddleware
{
/// <summary>
/// 请求委托,用于继续处理管道中的下一个中间件
/// </summary>
private readonly RequestDelegate _next;
/// <summary>
/// 日志记录器,用于记录中间件的操作日志
/// </summary>
private readonly ILogger<WebSocketMiddleware> _logger;
/// <summary>
/// 服务提供者,用于创建服务作用域和获取所需服务
/// </summary>
private readonly IServiceProvider _serviceProvider;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="next">请求委托,用于继续处理管道中的下一个中间件</param>
/// <param name="logger">日志记录器,用于记录中间件的操作日志</param>
/// <param name="serviceProvider">服务提供者,用于创建服务作用域和获取所需服务</param>
public WebSocketMiddleware(
RequestDelegate next,
ILogger<WebSocketMiddleware> logger,
IServiceProvider serviceProvider)
{
_next = next;
_logger = logger;
_serviceProvider = serviceProvider;
}
/// <summary>
/// 处理HTTP请求
/// 检查是否为WebSocket请求,如果是则处理WebSocket连接
/// </summary>
/// <param name="context">HTTP上下文,包含请求和响应信息</param>
public async Task InvokeAsync(HttpContext context)
{
// 检查是否为WebSocket请求
if (context.WebSockets.IsWebSocketRequest)
{
// 创建服务作用域,确保服务实例的生命周期正确
using var scope = _serviceProvider.CreateScope();
var webSocketService = scope.ServiceProvider.GetRequiredService<IWebSocketService>();
// 接受WebSocket连接
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = await webSocketService.AcceptConnectionAsync(webSocket);
try
{
// 处理WebSocket连接
await HandleWebSocketConnection(webSocket, connectionId, webSocketService);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理WebSocket连接时出错: {ConnectionId}", connectionId);
}
finally
{
// 确保连接被正确关闭
await webSocketService.CloseConnectionAsync(connectionId);
}
}
else
{
// 如果不是WebSocket请求,继续处理管道
await _next(context);
}
}
/// <summary>
/// 处理WebSocket连接
/// 循环接收消息并处理,直到连接关闭
/// </summary>
/// <param name="webSocket">WebSocket实例,用于接收和发送消息</param>
/// <param name="connectionId">连接ID,用于标识连接</param>
/// <param name="webSocketService">WebSocket服务,用于处理消息</param>
private async Task HandleWebSocketConnection(
System.Net.WebSockets.WebSocket webSocket,
string connectionId,
IWebSocketService webSocketService)
{
// 创建接收缓冲区,大小为4KB
var buffer = new byte[1024 * 4];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
// 循环接收消息,直到收到关闭消息
while (!receiveResult.CloseStatus.HasValue)
{
try
{
// 处理接收到的消息
var message = buffer.Take(receiveResult.Count).ToArray();
await webSocketService.SendMessageAsync(connectionId, message);
// 继续接收下一条消息
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理WebSocket消息时出错: {ConnectionId}", connectionId);
break;
}
}
// 如果收到关闭消息,则关闭连接
if (receiveResult.CloseStatus.HasValue)
{
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
}
}
}

420
src/CellularManagement.Infrastructure/WebSocket/WebSocketService.cs

@ -1,420 +0,0 @@
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using CellularManagement.Application.Services;
using CellularManagement.Domain.Entities;
using CellularManagement.Infrastructure.Monitoring;
using CellularManagement.Infrastructure.Pooling;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Caching.Memory;
namespace CellularManagement.Infrastructure.WebSocket;
/// <summary>
/// WebSocket服务实现类
/// 负责管理WebSocket连接的生命周期、消息发送和接收
/// 使用分布式缓存存储连接信息,本地缓存存储WebSocket实例
/// </summary>
public class WebSocketService : IWebSocketService
{
/// <summary>
/// 日志记录器,用于记录WebSocket服务的操作日志
/// </summary>
private readonly ILogger<WebSocketService> _logger;
/// <summary>
/// WebSocket指标监控,用于记录连接、消息等指标
/// </summary>
private readonly WebSocketMetrics _metrics;
/// <summary>
/// WebSocket消息池,用于复用消息上下文对象
/// </summary>
private readonly WebSocketMessagePool _messagePool;
/// <summary>
/// 分布式缓存,用于存储连接信息
/// </summary>
private readonly IDistributedCache _distributedCache;
/// <summary>
/// 本地缓存服务,用于存储WebSocket实例
/// </summary>
private readonly ICacheService _cacheService;
/// <summary>
/// 当前节点ID,用于标识服务实例
/// </summary>
private readonly string _nodeId;
/// <summary>
/// WebSocket连接信息在缓存中的键前缀
/// </summary>
private const string CONNECTION_PREFIX = "ws_connection_";
/// <summary>
/// WebSocket实例在缓存中的键前缀
/// </summary>
private const string WEBSOCKET_PREFIX = "ws_socket_";
/// <summary>
/// 用户关联信息在缓存中的键前缀
/// </summary>
private const string USER_PREFIX = "ws_user_";
/// <summary>
/// 节点信息在缓存中的键前缀
/// </summary>
private const string NODE_PREFIX = "ws_node_";
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器,用于记录服务操作日志</param>
/// <param name="metrics">WebSocket指标监控,用于记录连接和消息指标</param>
/// <param name="messagePool">消息池,用于复用消息上下文对象</param>
/// <param name="distributedCache">分布式缓存,用于存储连接信息</param>
/// <param name="cacheService">本地缓存服务,用于存储WebSocket实例</param>
public WebSocketService(
ILogger<WebSocketService> logger,
WebSocketMetrics metrics,
WebSocketMessagePool messagePool,
IDistributedCache distributedCache,
ICacheService cacheService)
{
_logger = logger;
_metrics = metrics;
_messagePool = messagePool;
_distributedCache = distributedCache;
_cacheService = cacheService;
_nodeId = Guid.NewGuid().ToString(); // 生成唯一的节点ID
}
/// <summary>
/// 接受新的WebSocket连接
/// 创建连接信息并存储到缓存中
/// </summary>
/// <param name="webSocket">WebSocket实例</param>
/// <returns>新创建的连接ID</returns>
public async Task<string> AcceptConnectionAsync(System.Net.WebSockets.WebSocket webSocket)
{
var connectionId = Guid.NewGuid().ToString();
var connection = WebSocketConnection.Create(connectionId);
// 生成缓存键
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}";
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
// 将连接信息存储到分布式缓存
await _distributedCache.SetStringAsync(
connectionKey,
JsonSerializer.Serialize(connection),
new DistributedCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromMinutes(30) // 设置30分钟滑动过期
});
// 将WebSocket实例存储到本地缓存
_cacheService.Set(webSocketKey, webSocket, new MemoryCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromMinutes(30)
});
// 将连接添加到当前节点
await AddConnectionToNodeAsync(connectionId);
_metrics.ConnectionEstablished(); // 记录连接建立指标
_logger.LogInformation("WebSocket连接已建立: {ConnectionId} 在节点 {NodeId}",
connectionId, _nodeId);
return connectionId;
}
/// <summary>
/// 关闭WebSocket连接
/// 清理连接相关的所有缓存信息
/// </summary>
/// <param name="connectionId">要关闭的连接ID</param>
/// <returns>是否成功关闭连接</returns>
public async Task<bool> CloseConnectionAsync(string connectionId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}";
var userKey = $"{USER_PREFIX}{connectionId}";
// 获取连接信息
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket))
{
try
{
if (connectionJson != null)
{
var connection = JsonSerializer.Deserialize<WebSocketConnection>(connectionJson);
if (connection != null && webSocket != null)
{
try
{
// 如果WebSocket处于打开状态,则正常关闭
if (webSocket.State == WebSocketState.Open || webSocket.State == WebSocketState.CloseReceived)
{
await webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"服务器关闭连接",
CancellationToken.None);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "关闭WebSocket连接时出错: {ConnectionId}", connectionId);
}
connection.Close();
await _distributedCache.SetStringAsync(connectionKey, JsonSerializer.Serialize(connection));
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 继续清理", connectionId);
}
}
// 清理连接相关的所有缓存
await RemoveConnectionFromNodeAsync(connectionId);
await _distributedCache.RemoveAsync(connectionKey);
_cacheService.Remove(webSocketKey);
await _distributedCache.RemoveAsync(userKey);
_metrics.ConnectionClosed(); // 记录连接关闭指标
_logger.LogInformation("WebSocket连接已关闭: {ConnectionId}", connectionId);
return true;
}
/// <summary>
/// 向指定连接发送消息
/// </summary>
/// <param name="connectionId">目标连接ID</param>
/// <param name="message">要发送的消息内容</param>
/// <returns>是否成功发送消息</returns>
public async Task<bool> SendMessageAsync(string connectionId, byte[] message)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var webSocketKey = $"{WEBSOCKET_PREFIX}{connectionId}";
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (_cacheService.TryGetValue(webSocketKey, out System.Net.WebSockets.WebSocket? webSocket))
{
try
{
if (connectionJson != null)
{
var connection = JsonSerializer.Deserialize<WebSocketConnection>(connectionJson);
if (connection?.State == WebSocketState.Open && webSocket != null)
{
try
{
await webSocket.SendAsync(
new ArraySegment<byte>(message),
WebSocketMessageType.Text,
true,
CancellationToken.None);
_metrics.MessageProcessed(TimeSpan.Zero); // 记录消息处理指标
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "发送消息到连接时出错: {ConnectionId}", connectionId);
_metrics.ErrorOccurred("SendMessage"); // 记录错误指标
}
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "反序列化WebSocketConnection失败: {ConnectionId}, 跳过消息发送", connectionId);
}
}
return false;
}
/// <summary>
/// 广播消息到所有连接
/// 遍历所有节点上的所有连接并发送消息
/// </summary>
/// <param name="message">要广播的消息内容</param>
/// <returns>是否所有消息都发送成功</returns>
public async Task<bool> BroadcastMessageAsync(byte[] message)
{
var nodes = await GetAllNodesAsync();
var success = true;
// 遍历所有节点上的所有连接
foreach (var node in nodes)
{
var nodeKey = $"{NODE_PREFIX}{node}";
var connectionsJson = await _distributedCache.GetStringAsync(nodeKey);
if (connectionsJson != null)
{
var connections = JsonSerializer.Deserialize<List<string>>(connectionsJson);
foreach (var connectionId in connections)
{
if (!await SendMessageAsync(connectionId, message))
{
success = false;
}
}
}
}
return success;
}
/// <summary>
/// 向指定用户的所有连接发送消息
/// </summary>
/// <param name="userId">目标用户ID</param>
/// <param name="message">要发送的消息内容</param>
/// <returns>是否所有消息都发送成功</returns>
public async Task<bool> SendMessageToUserAsync(string userId, byte[] message)
{
var userConnections = await GetUserConnectionsAsync(userId);
var success = true;
// 向用户的所有连接发送消息
foreach (var connection in userConnections)
{
if (!await SendMessageAsync(connection.ConnectionId, message))
{
success = false;
}
}
return success;
}
/// <summary>
/// 将连接与用户关联
/// 更新连接信息和用户关联缓存
/// </summary>
/// <param name="connectionId">连接ID</param>
/// <param name="userId">用户ID</param>
public async Task AssociateUserAsync(string connectionId, string userId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var userKey = $"{USER_PREFIX}{connectionId}";
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
if (connectionJson != null)
{
var connection = JsonSerializer.Deserialize<WebSocketConnection>(connectionJson);
if (connection != null)
{
connection.AssociateUser(userId);
await _distributedCache.SetStringAsync(connectionKey, JsonSerializer.Serialize(connection));
await _distributedCache.SetStringAsync(userKey, userId);
}
}
}
/// <summary>
/// 获取指定连接的信息
/// </summary>
/// <param name="connectionId">连接ID</param>
/// <returns>连接信息,如果不存在则返回null</returns>
public async Task<WebSocketConnection?> GetConnectionAsync(string connectionId)
{
var connectionKey = $"{CONNECTION_PREFIX}{connectionId}";
var connectionJson = await _distributedCache.GetStringAsync(connectionKey);
return connectionJson != null
? JsonSerializer.Deserialize<WebSocketConnection>(connectionJson)
: null;
}
/// <summary>
/// 获取指定用户的所有连接
/// </summary>
/// <param name="userId">用户ID</param>
/// <returns>用户的所有连接信息</returns>
public async Task<IEnumerable<WebSocketConnection>> GetUserConnectionsAsync(string userId)
{
var connections = new List<WebSocketConnection>();
var nodes = await GetAllNodesAsync();
// 遍历所有节点,查找用户的连接
foreach (var node in nodes)
{
var nodeKey = $"{NODE_PREFIX}{node}";
var connectionsJson = await _distributedCache.GetStringAsync(nodeKey);
if (connectionsJson != null)
{
var connectionIds = JsonSerializer.Deserialize<List<string>>(connectionsJson);
foreach (var connectionId in connectionIds)
{
var connection = await GetConnectionAsync(connectionId);
if (connection?.UserId == userId)
{
connections.Add(connection);
}
}
}
}
return connections;
}
/// <summary>
/// 将连接添加到当前节点
/// </summary>
/// <param name="connectionId">要添加的连接ID</param>
private async Task AddConnectionToNodeAsync(string connectionId)
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
var connections = await GetNodeConnectionsAsync();
connections.Add(connectionId);
await _distributedCache.SetStringAsync(
nodeKey,
JsonSerializer.Serialize(connections));
}
/// <summary>
/// 从当前节点移除连接
/// </summary>
/// <param name="connectionId">要移除的连接ID</param>
private async Task RemoveConnectionFromNodeAsync(string connectionId)
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
var connections = await GetNodeConnectionsAsync();
connections.Remove(connectionId);
await _distributedCache.SetStringAsync(
nodeKey,
JsonSerializer.Serialize(connections));
}
/// <summary>
/// 获取当前节点的所有连接
/// </summary>
/// <returns>当前节点的所有连接ID列表</returns>
private async Task<List<string>> GetNodeConnectionsAsync()
{
var nodeKey = $"{NODE_PREFIX}{_nodeId}";
var connectionsJson = await _distributedCache.GetStringAsync(nodeKey);
return connectionsJson != null
? JsonSerializer.Deserialize<List<string>>(connectionsJson) ?? new List<string>()
: new List<string>();
}
/// <summary>
/// 获取所有节点ID
/// 目前仅返回当前节点ID,后续需要实现服务发现机制
/// </summary>
/// <returns>所有节点ID列表</returns>
private async Task<List<string>> GetAllNodesAsync()
{
// 这里需要实现服务发现机制
// 可以使用Redis的Pub/Sub或其他服务发现机制
return new List<string> { _nodeId };
}
}

13
src/CellularManagement.WebAPI/CellularManagement.WebAPI.http

@ -1,5 +1,4 @@
@CellularManagement.WebAPI_HostAddress = http://localhost:5202
@WebSocket_HostAddress = ws://localhost:5202
### 获取天气预测
GET {{CellularManagement.WebAPI_HostAddress}}/weatherforecast/
@ -19,18 +18,6 @@ Content-Type: application/json
GET {{CellularManagement.WebAPI_HostAddress}}/api/users/me
Authorization: Bearer {{login.response.body.token}}
### WebSocket连接测试
# 使用WebSocket客户端工具连接
# 连接地址: {{WebSocket_HostAddress}}/ws
### 发送WebSocket消息
# 使用WebSocket客户端工具发送消息
# 消息格式: {"type": "message", "content": "Hello, WebSocket!"}
### 获取系统状态
GET {{CellularManagement.WebAPI_HostAddress}}/api/system/status
Authorization: Bearer {{login.response.body.token}}
### 获取WebSocket连接统计
GET {{CellularManagement.WebAPI_HostAddress}}/api/websocket/stats
Authorization: Bearer {{login.response.body.token}}

26
src/CellularManagement.WebAPI/Program.cs

@ -6,10 +6,6 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using CellularManagement.Infrastructure.Configurations;
using CellularManagement.Application;
using CellularManagement.Infrastructure.WebSocket;
using CellularManagement.Infrastructure.Monitoring;
using CellularManagement.Infrastructure.Pooling;
using CellularManagement.Application.Services;
using Microsoft.Extensions.Options;
// 创建 Web 应用程序构建器
@ -40,7 +36,10 @@ builder.Services.AddSwaggerGen();
// 注册 MediatR 服务
// 从 WebAPI 程序集加载处理器,用于处理 Web 层特定的命令和查询
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
builder.Services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
});
// 添加 CORS 服务
// 配置跨域资源共享支持
@ -49,14 +48,6 @@ builder.Services.AddCors();
// 配置认证服务
// 从配置文件中读取认证相关配置
builder.Services.Configure<AuthConfiguration>(builder.Configuration.GetSection("Auth"));
builder.Services.Configure<WebSocketConfiguration>(builder.Configuration.GetSection(WebSocketConfiguration.SectionName));
// 注册WebSocket相关服务
builder.Services.AddSingleton<WebSocketMetrics>();
builder.Services.AddSingleton<CellularManagement.Infrastructure.Pooling.WebSocketMessagePool>();
builder.Services.AddSingleton<WebSocketConnectionManager>();
builder.Services.AddSingleton<WebSocketMessagePipeline>();
builder.Services.AddScoped<IWebSocketService, CellularManagement.Infrastructure.WebSocket.WebSocketService>();
// 添加静态文件服务
builder.Services.AddDirectoryBrowser();
@ -86,15 +77,6 @@ app.UseCors(x => x.AllowAnyHeader().SetIsOriginAllowed(p => true).AllowAnyMethod
// 处理用户认证和授权
app.UseAuthorization();
// 配置 WebSocket 中间件
var webSocketConfig = app.Services.GetRequiredService<IOptions<WebSocketConfiguration>>().Value;
app.UseWebSockets(new Microsoft.AspNetCore.Builder.WebSocketOptions
{
KeepAliveInterval = TimeSpan.FromMinutes(2)
});
app.UseMiddleware<WebSocketMiddleware>();
// 配置静态文件中间件
app.UseStaticFiles();
app.UseDirectoryBrowser();

5
src/CellularManagement.WebAPI/appsettings.README.md

@ -25,10 +25,6 @@
- `KeyRotationDays`: 密钥轮换周期(天)
- `MinKeyLength`: 最小密钥长度
## WebSocket 配置
- `Port`: WebSocket 服务端口
- `Path`: WebSocket 连接路径
## 认证配置 (Auth)
- `MaxLoginAttempts`: 最大登录尝试次数
- `LoginAttemptsWindowMinutes`: 登录尝试窗口时间(分钟)
@ -46,7 +42,6 @@
- `ASPNETCORE_ENVIRONMENT` - 设置运行环境(Development/Production)
- `DatabaseOptions__DefaultConnection` - 数据库连接字符串
- `JwtOptions__SecretKey` - JWT密钥
- `WebSocket__Port` - WebSocket端口
- `Auth__MaxLoginAttempts` - 最大登录尝试次数
## 配置优先级

4
src/CellularManagement.WebAPI/appsettings.json

@ -34,9 +34,5 @@
"DefaultUserRole": "User",
"AccessTokenExpirationMinutes": 60,
"RefreshTokenExpirationDays": 7
},
"WebSocket": {
"Port": 5202,
"Path": "/ws"
}
}

26
src/CellularManagement.WebAPI/wwwroot/websocket.html

@ -56,7 +56,6 @@
<div class="flex items-center space-x-2 mb-3">
<input type="text"
id="wsAddress"
value="ws://localhost:5202/ws"
placeholder="输入WebSocket地址"
class="flex-1 px-4 py-2 border border-gray-300 dark:border-gray-600
rounded-lg focus:outline-none focus:ring-2 focus:ring-indigo-500
@ -415,8 +414,29 @@
messageCountElement.textContent = '0';
});
// 初始化Three.js
initThreeJS();
// 获取当前页面的协议
function getWebSocketProtocol() {
return window.location.protocol === 'https:' ? 'wss:' : 'ws:';
}
// 获取默认的 WebSocket 地址
function getDefaultWebSocketAddress() {
const protocol = getWebSocketProtocol();
const host = window.location.hostname;
const port = window.location.protocol === 'https:' ? '7268' : '5202';
return `${protocol}//${host}:${port}/ws`;
}
// 初始化 WebSocket 地址输入框
function initWebSocketAddress() {
wsAddressInput.value = getDefaultWebSocketAddress();
}
// 在页面加载时初始化
document.addEventListener('DOMContentLoaded', function() {
initWebSocketAddress();
initThreeJS();
});
</script>
</body>
</html>
Loading…
Cancel
Save