Browse Source

重构 WebSocket 处理程序结构,优化消息处理流程

norm
hyh 2 months ago
parent
commit
0e321f828b
  1. 1
      src/CellularManagement.WebAPI/Program.cs
  2. 10
      src/CellularManagement.WebSocket/CellularManagement.WebSocket.csproj
  3. 2
      src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs
  4. 2
      src/CellularManagement.WebSocket/Handlers/ChatMessageHandler.cs
  5. 2
      src/CellularManagement.WebSocket/Handlers/HandlerRegistrar.cs
  6. 2
      src/CellularManagement.WebSocket/Handlers/IWebSocketMessageHandler.cs
  7. 2
      src/CellularManagement.WebSocket/Handlers/NotificationMessageHandler.cs
  8. 23
      src/CellularManagement.WebSocket/Models/MessageStats.cs
  9. 113
      src/CellularManagement.WebSocket/Services/OutgoingMessageProcessor.cs

1
src/CellularManagement.WebAPI/Program.cs

@ -10,7 +10,6 @@ using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Extensions;
using CellularManagement.WebSocket.Services;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Services.Handlers;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.OpenApi.Models;

10
src/CellularManagement.WebSocket/CellularManagement.WebSocket.csproj

@ -8,6 +8,12 @@
<AssemblyName>CellularManagement.WebSocket</AssemblyName>
</PropertyGroup>
<ItemGroup>
<Compile Remove="Message\**" />
<EmbeddedResource Remove="Message\**" />
<None Remove="Message\**" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.2.1" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
@ -18,8 +24,4 @@
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<Folder Include="Message\" />
</ItemGroup>
</Project>

2
src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs

@ -1,7 +1,7 @@
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Handlers;
using CellularManagement.WebSocket.Middleware;
using CellularManagement.WebSocket.Services;
using CellularManagement.WebSocket.Services.Handlers;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;

2
src/CellularManagement.WebSocket/Services/Handlers/ChatMessageHandler.cs → src/CellularManagement.WebSocket/Handlers/ChatMessageHandler.cs

@ -3,7 +3,7 @@ using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services.Handlers;
namespace CellularManagement.WebSocket.Handlers;
public class ChatMessageHandler : IWebSocketMessageHandler
{

2
src/CellularManagement.WebSocket/Services/HandlerRegistrar.cs → src/CellularManagement.WebSocket/Handlers/HandlerRegistrar.cs

@ -3,7 +3,7 @@ using System.Threading.Tasks;
using CellularManagement.WebSocket.Handlers;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services;
namespace CellularManagement.WebSocket.Handlers;
/// <summary>
/// 负责注册 WebSocket 消息处理器到 HandlerManager

2
src/CellularManagement.WebSocket/Services/IWebSocketMessageHandler.cs → src/CellularManagement.WebSocket/Handlers/IWebSocketMessageHandler.cs

@ -1,7 +1,7 @@
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
namespace CellularManagement.WebSocket.Services;
namespace CellularManagement.WebSocket.Handlers;
/// <summary>
/// WebSocket 消息处理器接口

2
src/CellularManagement.WebSocket/Services/Handlers/NotificationMessageHandler.cs → src/CellularManagement.WebSocket/Handlers/NotificationMessageHandler.cs

@ -3,7 +3,7 @@ using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services.Handlers;
namespace CellularManagement.WebSocket.Handlers;
public class NotificationMessageHandler : IWebSocketMessageHandler
{

23
src/CellularManagement.WebSocket/Models/MessageStats.cs

@ -6,14 +6,31 @@ using System.Threading.Tasks;
namespace CellularManagement.WebSocket.Models
{
/// <summary>
/// 消息统计信息
/// 记录消息处理的统计数据
/// </summary>
public class MessageStats
{
public long TotalMessages { get; set; }
public long SuccessfulMessages { get; set; }
public long FailedMessages { get; set; }
/// <summary>
/// 总消息数
/// </summary>
public int TotalMessages { get; set; }
/// <summary>
/// 成功发送的消息数
/// </summary>
public int SuccessfulMessages { get; set; }
/// <summary>
/// 发送失败的消息数
/// </summary>
public int FailedMessages { get; set; }
/// <summary>
/// 平均发送时间(毫秒)
/// </summary>
public double AverageSendTime { get; set; }
}
}

113
src/CellularManagement.WebSocket/Services/OutgoingMessageProcessor.cs

@ -4,6 +4,7 @@ using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.WebSockets;
namespace CellularManagement.WebSocket.Services;
@ -18,71 +19,17 @@ namespace CellularManagement.WebSocket.Services;
/// </summary>
public class OutgoingMessageProcessor : IDisposable
{
/// <summary>
/// 连接管理器
/// 负责管理 WebSocket 连接
/// </summary>
private readonly IWebSocketConnectionManager _connectionManager;
/// <summary>
/// 消息队列管理器
/// 负责消息的入队和出队操作
/// </summary>
private readonly IWebSocketMessageQueueManager _messageQueueManager;
/// <summary>
/// 日志记录器
/// </summary>
private readonly ILogger<OutgoingMessageProcessor> _logger;
/// <summary>
/// 消息统计信息
/// 用于记录每个连接的消息处理情况
/// </summary>
private readonly ConcurrentDictionary<string, MessageStats> _messageStats;
/// <summary>
/// 背压控制信号量
/// 用于限制同时发送的消息数量
/// </summary>
private readonly SemaphoreSlim _backpressureSemaphore;
/// <summary>
/// 最大并发发送数
/// </summary>
private readonly int _maxConcurrentSends;
/// <summary>
/// 最大重试次数
/// </summary>
private readonly int _maxRetryAttempts;
/// <summary>
/// 发送超时时间
/// </summary>
private readonly TimeSpan _sendTimeout;
/// <summary>
/// 重试延迟时间
/// </summary>
private readonly TimeSpan _retryDelay;
/// <summary>
/// 停止令牌源
/// 用于控制处理器的停止
/// </summary>
private readonly CancellationTokenSource _stoppingCts;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="connectionManager">连接管理器</param>
/// <param name="messageQueueManager">消息队列管理器</param>
/// <param name="logger">日志记录器</param>
/// <param name="maxConcurrentSends">最大并发发送数,默认100</param>
/// <param name="maxRetryAttempts">最大重试次数,默认3</param>
/// <param name="sendTimeoutSeconds">发送超时时间(秒),默认30</param>
/// <param name="retryDelaySeconds">重试延迟时间(秒),默认5</param>
public OutgoingMessageProcessor(
IWebSocketConnectionManager connectionManager,
IWebSocketMessageQueueManager messageQueueManager,
@ -177,21 +124,44 @@ public class OutgoingMessageProcessor : IDisposable
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
cts.CancelAfter(_sendTimeout);
// 检查连接是否存在
var connection = _connectionManager.GetConnection(message.ConnectionId);
if (connection == null)
{
_logger.LogError("连接不存在,连接ID:{ConnectionId}", message.ConnectionId);
return false;
}
// 发送消息
await _connectionManager.SendMessageAsync(message.ConnectionId, message.Data, message.MessageType, cts.Token);
await connection.Socket.SendAsync(
new ArraySegment<byte>(message.Data),
message.MessageType,
true,
cts.Token);
return true;
}
catch (OperationCanceledException)
{
_logger.LogWarning("发送消息超时,连接ID:{ConnectionId}", message.ConnectionId);
return false;
}
catch (WebSocketException ex)
{
attempts++;
_logger.LogWarning(ex, "WebSocket发送失败,连接ID:{ConnectionId},重试次数:{Attempts}/{MaxRetries}",
message.ConnectionId, attempts, _maxRetryAttempts);
}
catch (Exception ex)
{
attempts++;
_logger.LogWarning(ex, "发送消息失败,连接ID:{ConnectionId},重试次数:{Attempts}/{MaxRetries}",
message.ConnectionId, attempts, _maxRetryAttempts);
}
if (attempts < _maxRetryAttempts)
{
await Task.Delay(_retryDelay, stoppingToken);
}
if (attempts < _maxRetryAttempts)
{
await Task.Delay(_retryDelay, stoppingToken);
}
}
@ -218,30 +188,5 @@ public class OutgoingMessageProcessor : IDisposable
}
}
/// <summary>
/// 消息统计信息
/// 记录消息处理的统计数据
/// </summary>
public class MessageStats
{
/// <summary>
/// 总消息数
/// </summary>
public int TotalMessages { get; set; }
/// <summary>
/// 成功发送的消息数
/// </summary>
public int SuccessfulMessages { get; set; }
/// <summary>
/// 发送失败的消息数
/// </summary>
public int FailedMessages { get; set; }
/// <summary>
/// 平均发送时间(毫秒)
/// </summary>
public double AverageSendTime { get; set; }
}

Loading…
Cancel
Save