Browse Source

feat: 添加 WebSocket 消息处理相关功能

norm
hyh 2 months ago
parent
commit
67076199b0
  1. 4
      src/CellularManagement.WebSocket/CellularManagement.WebSocket.csproj
  2. 3
      src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
  3. 159
      src/CellularManagement.WebSocket/Handlers/BaseMessageHandler.cs
  4. 227
      src/CellularManagement.WebSocket/Handlers/HandlerManager.cs
  5. 40
      src/CellularManagement.WebSocket/Handlers/WebSocketMessageHandlerAdapter.cs
  6. 100
      src/CellularManagement.WebSocket/Models/HandlerEvents.cs
  7. 122
      src/CellularManagement.WebSocket/Models/HandlerMetrics.cs
  8. 79
      src/CellularManagement.WebSocket/Models/IMessageHandler.cs
  9. 19
      src/CellularManagement.WebSocket/Models/MessageStats.cs
  10. 43
      src/CellularManagement.WebSocket/Models/WebSocketOptions.cs
  11. 106
      src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs
  12. 152
      src/CellularManagement.WebSocket/Services/OutgoingMessageProcessor.cs
  13. 98
      src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

4
src/CellularManagement.WebSocket/CellularManagement.WebSocket.csproj

@ -18,4 +18,8 @@
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<Folder Include="Message\" />
</ItemGroup>
</Project>

3
src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs

@ -1,3 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Threading.Channels;
@ -51,7 +52,7 @@ public class WebSocketConnectionManager : IDisposable
_logger.LogInformation("创建消息队列完成,入站队列大小:{IncomingSize},出站队列大小:{OutgoingSize}",
_options.MessageQueueSize, _options.MessageQueueSize);
_heartbeatTimer = new Timer(CheckConnections, null, _options.HeartbeatInterval, _options.HeartbeatInterval);
//_heartbeatTimer = new Timer(CheckConnections, null, _options.HeartbeatInterval, _options.HeartbeatInterval);
_logger.LogInformation("心跳检测定时器已启动,间隔:{Interval}秒", _options.HeartbeatInterval.TotalSeconds);
}

159
src/CellularManagement.WebSocket/Handlers/BaseMessageHandler.cs

@ -0,0 +1,159 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Handlers
{
/// <summary>
/// 基础消息处理器
/// </summary>
public abstract class BaseMessageHandler : IMessageHandler
{
protected readonly ILogger _logger;
protected readonly HandlerMetrics _metrics;
protected readonly Stopwatch _stopwatch;
private readonly Timer _healthCheckTimer;
protected BaseMessageHandler(ILogger logger)
{
_logger = logger;
_metrics = new HandlerMetrics
{
HandlerName = GetType().Name,
HealthStatus = HandlerHealthStatus.Healthy,
Configuration = new HandlerConfiguration()
};
_stopwatch = new Stopwatch();
_healthCheckTimer = new Timer(PerformHealthCheck, null,
TimeSpan.Zero,
TimeSpan.FromSeconds(_metrics.Configuration.HealthCheckInterval));
}
public abstract int Priority { get; }
public abstract string Name { get; }
public async Task<WebSocketMessage> HandleAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
if (!_metrics.Configuration.IsEnabled)
{
_logger.LogWarning("处理器已禁用,处理器:{HandlerName},连接ID:{ConnectionId}",
Name, message.ConnectionId);
throw new InvalidOperationException($"处理器 {Name} 已禁用");
}
_metrics.TotalMessages++;
_stopwatch.Restart();
try
{
var result = await ProcessMessageAsync(message, cancellationToken);
_metrics.SuccessMessages++;
return result;
}
catch (Exception ex)
{
_metrics.FailedMessages++;
_metrics.HealthCheckFailureCount++;
_logger.LogError(ex, "消息处理失败,处理器:{HandlerName},连接ID:{ConnectionId}",
Name, message.ConnectionId);
throw;
}
finally
{
_stopwatch.Stop();
_metrics.LastProcessedTime = DateTime.UtcNow;
UpdateAverageProcessingTime();
}
}
protected abstract Task<WebSocketMessage> ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken);
public HandlerStatistics GetStatistics()
{
return new HandlerStatistics
{
TotalMessages = _metrics.TotalMessages,
SuccessMessages = _metrics.SuccessMessages,
FailedMessages = _metrics.FailedMessages,
AverageProcessingTime = _metrics.AverageProcessingTime,
LastProcessedTime = _metrics.LastProcessedTime
};
}
/// <summary>
/// 获取处理器指标
/// </summary>
public HandlerMetrics GetMetrics()
{
return _metrics;
}
/// <summary>
/// 更新处理器配置
/// </summary>
public void UpdateConfiguration(HandlerConfiguration configuration)
{
_metrics.Configuration = configuration;
_healthCheckTimer.Change(
TimeSpan.Zero,
TimeSpan.FromSeconds(configuration.HealthCheckInterval));
_logger.LogInformation("更新处理器配置,处理器:{HandlerName},配置:{Configuration}",
Name, System.Text.Json.JsonSerializer.Serialize(configuration));
}
private void UpdateAverageProcessingTime()
{
if (_metrics.TotalMessages == 1)
{
_metrics.AverageProcessingTime = _stopwatch.ElapsedMilliseconds;
}
else
{
_metrics.AverageProcessingTime = (_metrics.AverageProcessingTime * (_metrics.TotalMessages - 1) +
_stopwatch.ElapsedMilliseconds) / _metrics.TotalMessages;
}
}
private void PerformHealthCheck(object state)
{
try
{
_metrics.LastHealthCheckTime = DateTime.UtcNow;
// 检查失败次数
if (_metrics.HealthCheckFailureCount >= _metrics.Configuration.MaxFailureThreshold)
{
_metrics.HealthStatus = HandlerHealthStatus.Unhealthy;
_logger.LogWarning("处理器健康检查失败,处理器:{HandlerName},失败次数:{FailureCount}",
Name, _metrics.HealthCheckFailureCount);
return;
}
// 检查平均处理时间
if (_metrics.AverageProcessingTime > _metrics.Configuration.AverageProcessingTimeWarningThreshold)
{
_metrics.HealthStatus = HandlerHealthStatus.Warning;
_logger.LogWarning("处理器性能警告,处理器:{HandlerName},平均处理时间:{AverageTime}ms",
Name, _metrics.AverageProcessingTime);
return;
}
_metrics.HealthStatus = HandlerHealthStatus.Healthy;
_metrics.HealthCheckFailureCount = 0;
_logger.LogDebug("处理器健康检查通过,处理器:{HandlerName}", Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理器健康检查异常,处理器:{HandlerName}", Name);
}
}
public void Dispose()
{
_healthCheckTimer?.Dispose();
}
}
}

227
src/CellularManagement.WebSocket/Handlers/HandlerManager.cs

@ -0,0 +1,227 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Handlers
{
/// <summary>
/// 处理器管理器
/// </summary>
public class HandlerManager : IDisposable
{
private readonly ILogger<HandlerManager> _logger;
private readonly ConcurrentDictionary<string, List<IMessageHandler>> _handlers;
private readonly ConcurrentDictionary<string, HandlerMetrics> _handlerMetrics;
private readonly ConcurrentDictionary<string, SemaphoreSlim> _handlerLocks;
private readonly ConcurrentDictionary<string, int> _handlerPoolSizes;
private readonly Timer _performanceSamplingTimer;
private readonly object _handlersLock = new object();
/// <summary>
/// 处理器状态变更事件
/// </summary>
public event EventHandler<HandlerStatusChangedEventArgs> HandlerStatusChanged;
/// <summary>
/// 处理器性能采样事件
/// </summary>
public event EventHandler<HandlerPerformanceSampleEventArgs> PerformanceSampled;
public HandlerManager(ILogger<HandlerManager> logger)
{
_logger = logger;
_handlers = new ConcurrentDictionary<string, List<IMessageHandler>>();
_handlerMetrics = new ConcurrentDictionary<string, HandlerMetrics>();
_handlerLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
_handlerPoolSizes = new ConcurrentDictionary<string, int>();
_performanceSamplingTimer = new Timer(CollectPerformanceSamples, null,
TimeSpan.Zero, TimeSpan.FromSeconds(5));
}
/// <summary>
/// 注册处理器
/// </summary>
public async Task RegisterHandlerAsync(string messageType, IMessageHandler handler, int poolSize = 1)
{
var handlerLock = _handlerLocks.GetOrAdd(handler.Name, _ => new SemaphoreSlim(1, 1));
await handlerLock.WaitAsync();
try
{
_logger.LogInformation("注册处理器,消息类型:{MessageType},处理器:{HandlerName},池大小:{PoolSize}",
messageType, handler.Name, poolSize);
if (!_handlers.ContainsKey(messageType))
{
_handlers[messageType] = new List<IMessageHandler>();
}
// 检查处理器是否已存在
if (_handlers[messageType].Any(h => h.Name == handler.Name))
{
throw new InvalidOperationException($"处理器 {handler.Name} 已注册");
}
_handlers[messageType].Add(handler);
_handlers[messageType].Sort((a, b) => a.Priority.CompareTo(b.Priority));
_handlerMetrics[handler.Name] = handler.GetMetrics();
_handlerPoolSizes[handler.Name] = poolSize;
_logger.LogDebug("处理器注册完成,当前处理器数量:{HandlerCount}", _handlers[messageType].Count);
}
finally
{
handlerLock.Release();
}
}
/// <summary>
/// 移除处理器
/// </summary>
public async Task RemoveHandlerAsync(string messageType, string handlerName)
{
var handlerLock = _handlerLocks.GetOrAdd(handlerName, _ => new SemaphoreSlim(1, 1));
await handlerLock.WaitAsync();
try
{
_logger.LogInformation("移除处理器,消息类型:{MessageType},处理器:{HandlerName}",
messageType, handlerName);
if (_handlers.TryGetValue(messageType, out var handlers))
{
var handler = handlers.FirstOrDefault(h => h.Name == handlerName);
if (handler != null)
{
handlers.Remove(handler);
_handlerMetrics.TryRemove(handlerName, out _);
_handlerPoolSizes.TryRemove(handlerName, out _);
handler.Dispose();
}
}
}
finally
{
handlerLock.Release();
}
}
/// <summary>
/// 获取处理器
/// </summary>
public async Task<IMessageHandler> GetHandlerAsync(string messageType, string handlerName)
{
var handlerLock = _handlerLocks.GetOrAdd(handlerName, _ => new SemaphoreSlim(1, 1));
await handlerLock.WaitAsync();
try
{
if (_handlers.TryGetValue(messageType, out var handlers))
{
return handlers.FirstOrDefault(h => h.Name == handlerName);
}
return null;
}
finally
{
handlerLock.Release();
}
}
/// <summary>
/// 获取所有处理器
/// </summary>
public async Task<List<IMessageHandler>> GetAllHandlersAsync(string messageType)
{
if (_handlers.TryGetValue(messageType, out var handlers))
{
return new List<IMessageHandler>(handlers);
}
return new List<IMessageHandler>();
}
/// <summary>
/// 更新处理器配置
/// </summary>
public async Task UpdateHandlerConfigurationAsync(string handlerName, HandlerConfiguration configuration)
{
var handlerLock = _handlerLocks.GetOrAdd(handlerName, _ => new SemaphoreSlim(1, 1));
await handlerLock.WaitAsync();
try
{
var handler = _handlers.Values
.SelectMany(h => h)
.FirstOrDefault(h => h.Name == handlerName);
if (handler == null)
{
throw new InvalidOperationException($"未找到处理器:{handlerName}");
}
handler.UpdateConfiguration(configuration);
_handlerMetrics[handlerName] = handler.GetMetrics();
_logger.LogInformation("更新处理器配置,处理器:{HandlerName},配置:{Configuration}",
handlerName, System.Text.Json.JsonSerializer.Serialize(configuration));
}
finally
{
handlerLock.Release();
}
}
/// <summary>
/// 获取处理器指标
/// </summary>
public Dictionary<string, HandlerMetrics> GetHandlerMetrics()
{
return _handlerMetrics.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
}
/// <summary>
/// 更新处理器状态
/// </summary>
public void UpdateHandlerStatus(string handlerName, HandlerHealthStatus newStatus, string reason)
{
if (_handlerMetrics.TryGetValue(handlerName, out var metrics))
{
var oldStatus = metrics.HealthStatus;
metrics.HealthStatus = newStatus;
HandlerStatusChanged?.Invoke(this, new HandlerStatusChangedEventArgs(
handlerName,
metrics.MessageType,
oldStatus,
newStatus,
reason));
}
}
private void CollectPerformanceSamples(object state)
{
foreach (var kvp in _handlerMetrics)
{
var metrics = kvp.Value;
PerformanceSampled?.Invoke(this, new HandlerPerformanceSampleEventArgs(
metrics.HandlerName,
metrics.MessageType,
metrics.AverageProcessingTime,
metrics.SuccessMessages > 0 && metrics.FailedMessages == 0));
}
}
public void Dispose()
{
_performanceSamplingTimer?.Dispose();
foreach (var handlerLock in _handlerLocks.Values)
{
handlerLock.Dispose();
}
}
}
}

40
src/CellularManagement.WebSocket/Handlers/WebSocketMessageHandlerAdapter.cs

@ -0,0 +1,40 @@
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using CellularManagement.WebSocket.Services;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Handlers
{
/// <summary>
/// WebSocket 消息处理器适配器
/// </summary>
public class WebSocketMessageHandlerAdapter : BaseMessageHandler
{
private readonly IWebSocketMessageHandler _handler;
public WebSocketMessageHandlerAdapter(
IWebSocketMessageHandler handler,
ILogger<WebSocketMessageService> logger)
: base(logger)
{
_handler = handler;
}
public override string Name => _handler.GetType().Name;
public override int Priority => 0;
protected override async Task<WebSocketMessage> ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
try
{
var result = await _handler.HandleAsync(message);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生错误,处理器:{HandlerName}", Name);
throw;
}
}
}
}

100
src/CellularManagement.WebSocket/Models/HandlerEvents.cs

@ -0,0 +1,100 @@
using System;
namespace CellularManagement.WebSocket.Models
{
/// <summary>
/// 处理器事件参数
/// </summary>
public class HandlerEventArgs : EventArgs
{
/// <summary>
/// 处理器名称
/// </summary>
public string HandlerName { get; }
/// <summary>
/// 消息类型
/// </summary>
public string MessageType { get; }
/// <summary>
/// 事件时间
/// </summary>
public DateTime EventTime { get; }
public HandlerEventArgs(string handlerName, string messageType)
{
HandlerName = handlerName;
MessageType = messageType;
EventTime = DateTime.UtcNow;
}
}
/// <summary>
/// 处理器状态变更事件参数
/// </summary>
public class HandlerStatusChangedEventArgs : HandlerEventArgs
{
/// <summary>
/// 旧状态
/// </summary>
public HandlerHealthStatus OldStatus { get; }
/// <summary>
/// 新状态
/// </summary>
public HandlerHealthStatus NewStatus { get; }
/// <summary>
/// 变更原因
/// </summary>
public string Reason { get; }
public HandlerStatusChangedEventArgs(
string handlerName,
string messageType,
HandlerHealthStatus oldStatus,
HandlerHealthStatus newStatus,
string reason)
: base(handlerName, messageType)
{
OldStatus = oldStatus;
NewStatus = newStatus;
Reason = reason;
}
}
/// <summary>
/// 处理器性能采样事件参数
/// </summary>
public class HandlerPerformanceSampleEventArgs : HandlerEventArgs
{
/// <summary>
/// 处理时间(毫秒)
/// </summary>
public double ProcessingTime { get; }
/// <summary>
/// 是否成功
/// </summary>
public bool IsSuccess { get; }
/// <summary>
/// 错误信息
/// </summary>
public string ErrorMessage { get; }
public HandlerPerformanceSampleEventArgs(
string handlerName,
string messageType,
double processingTime,
bool isSuccess,
string errorMessage = null)
: base(handlerName, messageType)
{
ProcessingTime = processingTime;
IsSuccess = isSuccess;
ErrorMessage = errorMessage;
}
}
}

122
src/CellularManagement.WebSocket/Models/HandlerMetrics.cs

@ -0,0 +1,122 @@
using System;
namespace CellularManagement.WebSocket.Models
{
/// <summary>
/// 处理器性能指标
/// </summary>
public class HandlerMetrics
{
/// <summary>
/// 处理器名称
/// </summary>
public string HandlerName { get; set; }
/// <summary>
/// 消息类型
/// </summary>
public string MessageType { get; set; }
/// <summary>
/// 处理消息总数
/// </summary>
public long TotalMessages { get; set; }
/// <summary>
/// 成功处理消息数
/// </summary>
public long SuccessMessages { get; set; }
/// <summary>
/// 失败消息数
/// </summary>
public long FailedMessages { get; set; }
/// <summary>
/// 平均处理时间(毫秒)
/// </summary>
public double AverageProcessingTime { get; set; }
/// <summary>
/// 最后处理时间
/// </summary>
public DateTime LastProcessedTime { get; set; }
/// <summary>
/// 处理器状态
/// </summary>
public HandlerHealthStatus HealthStatus { get; set; }
/// <summary>
/// 最后健康检查时间
/// </summary>
public DateTime LastHealthCheckTime { get; set; }
/// <summary>
/// 健康检查失败次数
/// </summary>
public int HealthCheckFailureCount { get; set; }
/// <summary>
/// 处理器配置
/// </summary>
public HandlerConfiguration Configuration { get; set; }
}
/// <summary>
/// 处理器健康状态
/// </summary>
public enum HandlerHealthStatus
{
/// <summary>
/// 健康
/// </summary>
Healthy,
/// <summary>
/// 警告
/// </summary>
Warning,
/// <summary>
/// 不健康
/// </summary>
Unhealthy
}
/// <summary>
/// 处理器配置
/// </summary>
public class HandlerConfiguration
{
/// <summary>
/// 是否启用
/// </summary>
public bool IsEnabled { get; set; } = true;
/// <summary>
/// 超时时间(毫秒)
/// </summary>
public int Timeout { get; set; } = 30000;
/// <summary>
/// 最大重试次数
/// </summary>
public int MaxRetries { get; set; } = 3;
/// <summary>
/// 健康检查间隔(秒)
/// </summary>
public int HealthCheckInterval { get; set; } = 60;
/// <summary>
/// 最大失败次数阈值
/// </summary>
public int MaxFailureThreshold { get; set; } = 10;
/// <summary>
/// 平均处理时间警告阈值(毫秒)
/// </summary>
public double AverageProcessingTimeWarningThreshold { get; set; } = 1000;
}
}

79
src/CellularManagement.WebSocket/Models/IMessageHandler.cs

@ -0,0 +1,79 @@
using System;
using System.Threading.Tasks;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Models
{
/// <summary>
/// 消息处理器接口
/// </summary>
public interface IMessageHandler : IDisposable
{
/// <summary>
/// 处理器优先级(数字越小优先级越高)
/// </summary>
int Priority { get; }
/// <summary>
/// 处理器名称
/// </summary>
string Name { get; }
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message">WebSocket消息</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>处理结果</returns>
Task<WebSocketMessage> HandleAsync(WebSocketMessage message, CancellationToken cancellationToken);
/// <summary>
/// 获取处理器统计信息
/// </summary>
/// <returns>统计信息</returns>
HandlerStatistics GetStatistics();
/// <summary>
/// 获取处理器指标
/// </summary>
/// <returns>处理器指标</returns>
HandlerMetrics GetMetrics();
/// <summary>
/// 更新处理器配置
/// </summary>
/// <param name="configuration">处理器配置</param>
void UpdateConfiguration(HandlerConfiguration configuration);
}
/// <summary>
/// 处理器统计信息
/// </summary>
public class HandlerStatistics
{
/// <summary>
/// 处理消息总数
/// </summary>
public long TotalMessages { get; set; }
/// <summary>
/// 成功处理消息数
/// </summary>
public long SuccessMessages { get; set; }
/// <summary>
/// 失败消息数
/// </summary>
public long FailedMessages { get; set; }
/// <summary>
/// 平均处理时间(毫秒)
/// </summary>
public double AverageProcessingTime { get; set; }
/// <summary>
/// 最后处理时间
/// </summary>
public DateTime LastProcessedTime { get; set; }
}
}

19
src/CellularManagement.WebSocket/Models/MessageStats.cs

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CellularManagement.WebSocket.Models
{
/// <summary>
/// 消息统计信息
/// </summary>
public class MessageStats
{
public long TotalMessages { get; set; }
public long SuccessfulMessages { get; set; }
public long FailedMessages { get; set; }
public double AverageSendTime { get; set; }
}
}

43
src/CellularManagement.WebSocket/Models/WebSocketOptions.cs

@ -6,12 +6,49 @@ using System.Threading.Tasks;
namespace CellularManagement.WebSocket.Models
{
/// <summary>
/// WebSocket 配置选项
/// </summary>
public class WebSocketOptions
{
/// <summary>
/// 最大并发连接数
/// </summary>
public int MaxConcurrentConnections { get; set; } = 1000;
public int MaxMessageSize { get; set; } = 1024 * 4;
public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// 消息队列大小
/// </summary>
public int MessageQueueSize { get; set; } = 10000;
/// <summary>
/// 消息队列大小
/// </summary>
public int MaxMessageSize { get; set; } = 1024*8;
/// <summary>
/// 心跳检测间隔
/// </summary>
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// 连接超时时间
/// </summary>
public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromMinutes(5);
/// <summary>
/// 消息发送超时时间
/// </summary>
public TimeSpan MessageSendTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// 消息重试次数
/// </summary>
public int MessageRetryCount { get; set; } = 3;
/// <summary>
/// 消息重试间隔
/// </summary>
public TimeSpan MessageRetryInterval { get; set; } = TimeSpan.FromSeconds(1);
}
}

106
src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs

@ -1,7 +1,9 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using CellularManagement.WebSocket.Handlers;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
namespace CellularManagement.WebSocket.Pipeline.Steps;
@ -12,37 +14,27 @@ namespace CellularManagement.WebSocket.Pipeline.Steps;
public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessage>
{
private readonly ILogger<MessageRoutingStep> _logger;
private readonly Dictionary<string, List<Func<WebSocketMessage, Task<WebSocketMessage>>>> _handlers;
private readonly HandlerManager _handlerManager;
private readonly JsonSerializerOptions _jsonOptions;
private int _currentHandlerIndex = 0;
private readonly TimeSpan _defaultTimeout;
private readonly int _maxRetries;
public MessageRoutingStep(ILogger<MessageRoutingStep> logger)
public MessageRoutingStep(
ILogger<MessageRoutingStep> logger,
HandlerManager handlerManager,
TimeSpan? defaultTimeout = null,
int maxRetries = 3)
{
_logger = logger;
_handlers = new Dictionary<string, List<Func<WebSocketMessage, Task<WebSocketMessage>>>>();
_handlerManager = handlerManager;
_jsonOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
};
_logger.LogInformation("初始化消息路由步骤");
}
/// <summary>
/// 注册消息处理器
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="handler">消息处理器</param>
public void RegisterHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler)
{
_logger.LogInformation("注册消息处理器,消息类型:{MessageType},处理器类型:{HandlerType}",
messageType, handler.Method.DeclaringType?.Name);
if (!_handlers.ContainsKey(messageType))
{
_handlers[messageType] = new List<Func<WebSocketMessage, Task<WebSocketMessage>>>();
}
_handlers[messageType].Add(handler);
_logger.LogDebug("消息处理器注册完成,当前处理器数量:{HandlerCount}", _handlers[messageType].Count);
_defaultTimeout = defaultTimeout ?? TimeSpan.FromSeconds(30);
_maxRetries = maxRetries;
_logger.LogInformation("初始化消息路由步骤,默认超时时间:{Timeout},最大重试次数:{MaxRetries}",
_defaultTimeout, _maxRetries);
}
/// <summary>
@ -71,26 +63,54 @@ public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessa
throw new InvalidOperationException("无效的消息类型");
}
if (!_handlers.ContainsKey(messageType))
var handlers = await _handlerManager.GetAllHandlersAsync(messageType);
if (handlers == null || handlers.Count == 0)
{
_logger.LogWarning("未找到消息类型的处理器,消息类型:{MessageType},连接ID:{ConnectionId}",
messageType, input.ConnectionId);
throw new InvalidOperationException($"未找到消息类型 {messageType} 的处理器");
}
// 轮询选择处理器
var handlers = _handlers[messageType];
var handler = handlers[_currentHandlerIndex % handlers.Count];
_currentHandlerIndex = (_currentHandlerIndex + 1) % handlers.Count;
Exception lastException = null;
_logger.LogDebug("选择消息处理器,消息类型:{MessageType},处理器索引:{HandlerIndex},连接ID:{ConnectionId}",
messageType, _currentHandlerIndex, input.ConnectionId);
for (int retry = 0; retry < _maxRetries; retry++)
{
foreach (var handler in handlers)
{
var metrics = handler.GetMetrics();
if (metrics.HealthStatus == HandlerHealthStatus.Unhealthy)
{
_logger.LogWarning("跳过不健康的处理器,处理器:{HandlerName},连接ID:{ConnectionId}",
handler.Name, input.ConnectionId);
continue;
}
var result = await handler(input);
_logger.LogDebug("消息处理完成,连接ID:{ConnectionId},消息类型:{MessageType}",
input.ConnectionId, messageType);
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(metrics.Configuration.Timeout));
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
return result;
var result = await handler.HandleAsync(input, linkedCts.Token);
_logger.LogDebug("消息处理完成,连接ID:{ConnectionId},消息类型:{MessageType},处理器:{HandlerName}",
input.ConnectionId, messageType, handler.Name);
return result;
}
catch (OperationCanceledException)
{
_logger.LogWarning("消息处理超时,连接ID:{ConnectionId},消息类型:{MessageType},处理器:{HandlerName},重试次数:{RetryCount}",
input.ConnectionId, messageType, handler.Name, retry);
lastException = new TimeoutException($"消息处理超时,处理器:{handler.Name}");
}
catch (Exception ex)
{
_logger.LogError(ex, "消息处理失败,连接ID:{ConnectionId},消息类型:{MessageType},处理器:{HandlerName},重试次数:{RetryCount}",
input.ConnectionId, messageType, handler.Name, retry);
lastException = ex;
}
}
}
throw lastException ?? new Exception("消息处理失败,所有重试都未成功");
}
catch (JsonException ex)
{
@ -116,7 +136,23 @@ public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessa
{
_logger.LogError(ex, "消息路由失败,连接ID:{ConnectionId},错误信息:{ErrorMessage}",
input.ConnectionId, ex.Message);
throw;
throw new PipelineException("消息路由失败", ex);
}
}
/// <summary>
/// 获取处理器指标
/// </summary>
public Dictionary<string, HandlerMetrics> GetHandlerMetrics()
{
return _handlerManager.GetHandlerMetrics();
}
/// <summary>
/// 更新处理器配置
/// </summary>
public async Task UpdateHandlerConfigurationAsync(string handlerName, HandlerConfiguration configuration)
{
await _handlerManager.UpdateHandlerConfigurationAsync(handlerName, configuration);
}
}

152
src/CellularManagement.WebSocket/Services/OutgoingMessageProcessor.cs

@ -0,0 +1,152 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace CellularManagement.WebSocket.Services;
/// <summary>
/// 出站消息处理器
/// 负责处理消息的发送、重试、超时控制和监控
/// </summary>
public class OutgoingMessageProcessor : IDisposable
{
private readonly WebSocketConnectionManager _connectionManager;
private readonly ILogger<OutgoingMessageProcessor> _logger;
private readonly ConcurrentDictionary<string, MessageStats> _messageStats;
private readonly SemaphoreSlim _backpressureSemaphore;
private readonly int _maxConcurrentSends;
private readonly int _maxRetryAttempts;
private readonly TimeSpan _sendTimeout;
private readonly TimeSpan _retryDelay;
private readonly CancellationTokenSource _stoppingCts;
public OutgoingMessageProcessor(
WebSocketConnectionManager connectionManager,
ILogger<OutgoingMessageProcessor> logger,
int maxConcurrentSends = 100,
int maxRetryAttempts = 3,
int sendTimeoutSeconds = 30,
int retryDelaySeconds = 5)
{
_connectionManager = connectionManager;
_logger = logger;
_maxConcurrentSends = maxConcurrentSends;
_maxRetryAttempts = maxRetryAttempts;
_sendTimeout = TimeSpan.FromSeconds(sendTimeoutSeconds);
_retryDelay = TimeSpan.FromSeconds(retryDelaySeconds);
_messageStats = new ConcurrentDictionary<string, MessageStats>();
_backpressureSemaphore = new SemaphoreSlim(maxConcurrentSends);
_stoppingCts = new CancellationTokenSource();
}
/// <summary>
/// 处理出站消息
/// </summary>
public async Task ProcessOutgoingMessagesAsync(CancellationToken stoppingToken)
{
try
{
await foreach (var message in _connectionManager.ReadOutgoingMessagesAsync(stoppingToken))
{
await ProcessMessageAsync(message, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("出站消息处理已停止");
}
}
private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken stoppingToken)
{
await _backpressureSemaphore.WaitAsync(stoppingToken);
try
{
var stats = _messageStats.GetOrAdd(message.ConnectionId, _ => new MessageStats());
stats.TotalMessages++;
var stopwatch = Stopwatch.StartNew();
var success = await SendMessageWithRetryAsync(message, stoppingToken);
stopwatch.Stop();
if (success)
{
stats.SuccessfulMessages++;
stats.AverageSendTime = (stats.AverageSendTime * (stats.SuccessfulMessages - 1) + stopwatch.ElapsedMilliseconds) / stats.SuccessfulMessages;
}
else
{
stats.FailedMessages++;
}
}
finally
{
_backpressureSemaphore.Release();
}
}
private async Task<bool> SendMessageWithRetryAsync(WebSocketMessage message, CancellationToken stoppingToken)
{
for (int attempt = 1; attempt <= _maxRetryAttempts; attempt++)
{
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, _stoppingCts.Token);
cts.CancelAfter(_sendTimeout);
var connection = _connectionManager.GetConnection(message.ConnectionId);
if (connection?.Socket.State != System.Net.WebSockets.WebSocketState.Open)
{
_logger.LogWarning("连接不存在或已关闭,连接ID:{ConnectionId}", message.ConnectionId);
return false;
}
await connection.Socket.SendAsync(
new ArraySegment<byte>(message.Data),
message.MessageType,
true,
cts.Token);
_logger.LogDebug("消息发送成功,连接ID:{ConnectionId},尝试次数:{Attempt}",
message.ConnectionId, attempt);
return true;
}
catch (OperationCanceledException)
{
_logger.LogWarning("消息发送超时,连接ID:{ConnectionId},尝试次数:{Attempt}",
message.ConnectionId, attempt);
}
catch (Exception ex)
{
_logger.LogError(ex, "发送消息时发生错误,连接ID:{ConnectionId},尝试次数:{Attempt}",
message.ConnectionId, attempt);
}
if (attempt < _maxRetryAttempts)
{
await Task.Delay(_retryDelay, stoppingToken);
}
}
return false;
}
/// <summary>
/// 获取消息统计信息
/// </summary>
public IReadOnlyDictionary<string, MessageStats> GetMessageStats()
{
return _messageStats;
}
public void Dispose()
{
_stoppingCts.Cancel();
_backpressureSemaphore.Dispose();
_stoppingCts.Dispose();
}
}

98
src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

@ -3,8 +3,10 @@ using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using CellularManagement.WebSocket.Pipeline;
using CellularManagement.WebSocket.Pipeline.Steps;
using CellularManagement.WebSocket.Handlers;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading.Channels;
namespace CellularManagement.WebSocket.Services;
@ -18,9 +20,13 @@ public class WebSocketMessageService : BackgroundService
private readonly ILogger<WebSocketMessageService> _logger;
private readonly IPipelineStep<WebSocketMessage, WebSocketMessage> _pipeline;
private readonly MessageRoutingStep _routingStep;
private readonly HandlerManager _handlerManager;
private readonly SemaphoreSlim _processingSemaphore;
private readonly int _maxConcurrentProcesses = 10;
private readonly IEnumerable<IWebSocketMessageHandler> _messageHandlers;
private readonly Channel<WebSocketMessage> _messageChannel;
private readonly CancellationTokenSource _stoppingCts;
private readonly OutgoingMessageProcessor _outgoingMessageProcessor;
public WebSocketMessageService(
WebSocketConnectionManager connectionManager,
@ -33,8 +39,18 @@ public class WebSocketMessageService : BackgroundService
_messageHandlers = messageHandlers;
_logger.LogInformation("初始化 WebSocket 消息服务");
_routingStep = new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>());
_handlerManager = new HandlerManager(loggerFactory.CreateLogger<HandlerManager>());
_routingStep = new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>(), _handlerManager);
_processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses);
_messageChannel = Channel.CreateUnbounded<WebSocketMessage>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});
_stoppingCts = new CancellationTokenSource();
_outgoingMessageProcessor = new OutgoingMessageProcessor(
connectionManager,
loggerFactory.CreateLogger<OutgoingMessageProcessor>());
_logger.LogInformation("创建消息处理信号量,最大并发数:{MaxConcurrentProcesses}", _maxConcurrentProcesses);
// 构建处理管道
@ -49,22 +65,39 @@ public class WebSocketMessageService : BackgroundService
RegisterMessageHandlers();
// 订阅消息到达事件
_connectionManager.OnMessageReceived += ProcessMessageAsync;
_connectionManager.OnMessageReceived += OnMessageReceived;
_logger.LogInformation("已订阅消息到达事件");
}
private void RegisterMessageHandlers()
private Task OnMessageReceived(WebSocketMessage message)
{
try
{
if (!_messageChannel.Writer.TryWrite(message))
{
_logger.LogWarning("消息通道已满,无法处理新消息,连接ID:{ConnectionId}", message.ConnectionId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
return Task.CompletedTask;
}
private async void RegisterMessageHandlers()
{
foreach (var handler in _messageHandlers)
{
_logger.LogInformation("注册消息处理器,消息类型:{MessageType}", handler.MessageType);
_routingStep.RegisterHandler(handler.MessageType, handler.HandleAsync);
var adapter = new WebSocketMessageHandlerAdapter(handler, _logger);
await _handlerManager.RegisterHandlerAsync(handler.MessageType, adapter);
}
}
/// <summary>
/// 执行后台服务
/// 负责处理出站消息的发送
/// 负责处理消息的接收和处理
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
@ -72,34 +105,11 @@ public class WebSocketMessageService : BackgroundService
try
{
// 处理入站消息
await ProcessIncomingMessagesAsync(stoppingToken);
// 处理出站消息
await foreach (var message in _connectionManager.ReadOutgoingMessagesAsync(stoppingToken))
{
try
{
_logger.LogDebug("开始处理出站消息,连接ID:{ConnectionId},消息类型:{MessageType},数据大小:{DataSize}字节",
message.ConnectionId, message.MessageType, message.Data.Length);
var connection = _connectionManager.GetConnection(message.ConnectionId);
if (connection?.Socket.State == System.Net.WebSockets.WebSocketState.Open)
{
await connection.Socket.SendAsync(
new ArraySegment<byte>(message.Data),
message.MessageType,
true,
stoppingToken);
_logger.LogDebug("消息发送成功,连接ID:{ConnectionId}", message.ConnectionId);
}
else
{
_logger.LogWarning("消息发送失败,连接不存在或已关闭,连接ID:{ConnectionId}", message.ConnectionId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "发送消息时发生错误,连接ID:{ConnectionId}", message.ConnectionId);
}
}
await _outgoingMessageProcessor.ProcessOutgoingMessagesAsync(stoppingToken);
}
catch (OperationCanceledException)
{
@ -115,6 +125,21 @@ public class WebSocketMessageService : BackgroundService
}
}
private async Task ProcessIncomingMessagesAsync(CancellationToken stoppingToken)
{
try
{
await foreach (var message in _messageChannel.Reader.ReadAllAsync(stoppingToken))
{
await ProcessMessageAsync(message);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("入站消息处理已停止");
}
}
/// <summary>
/// 处理消息
/// </summary>
@ -126,7 +151,7 @@ public class WebSocketMessageService : BackgroundService
await _processingSemaphore.WaitAsync();
try
{
var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, CancellationToken.None);
var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, _stoppingCts.Token);
_logger.LogDebug("消息处理完成,连接ID:{ConnectionId},处理结果:{Processed}",
message.ConnectionId, processedMessage != null);
@ -182,8 +207,13 @@ public class WebSocketMessageService : BackgroundService
{
_logger.LogInformation("正在释放 WebSocket 消息服务资源");
// 取消订阅事件
_connectionManager.OnMessageReceived -= ProcessMessageAsync;
_connectionManager.OnMessageReceived -= OnMessageReceived;
// 停止所有正在进行的操作
_stoppingCts.Cancel();
// 释放资源
_processingSemaphore.Dispose();
_stoppingCts.Dispose();
_outgoingMessageProcessor.Dispose();
base.Dispose();
_logger.LogInformation("WebSocket 消息服务资源已释放");
}

Loading…
Cancel
Save