Browse Source

feat: 添加WebSocket功能,包括消息处理管道、中间件和WebSocket服务

ws
hyh 3 months ago
parent
commit
efcb2882a1
  1. 13
      CellularManagement.sln
  2. 1
      src/CellularManagement.Presentation/DependencyInjectionExtensions.cs
  3. 3
      src/CellularManagement.WebAPI/CellularManagement.WebAPI.csproj
  4. 67
      src/CellularManagement.WebAPI/Program.cs
  5. 21
      src/CellularManagement.WebSocket/CellularManagement.WebSocket.csproj
  6. 168
      src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs
  7. 36
      src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs
  8. 109
      src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs
  9. 19
      src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs
  10. 73
      src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs
  11. 86
      src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs
  12. 84
      src/CellularManagement.WebSocket/Pipeline/Steps/MessageValidationStep.cs
  13. 108
      src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

13
CellularManagement.sln

@ -15,14 +15,13 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CellularManagement.Applicat
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CellularManagement.Infrastructure", "src\CellularManagement.Infrastructure\CellularManagement.Infrastructure.csproj", "{6CF192C7-060B-4328-B9F4-B05BD7401232}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CellularManagement.WebSocket", "src\CellularManagement.WebSocket\CellularManagement.WebSocket.csproj", "{155D6C93-9A46-45CD-B21F-0F60719515D0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{0E228A93-4B80-450E-BDBB-BAF576E6FC73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0E228A93-4B80-450E-BDBB-BAF576E6FC73}.Debug|Any CPU.Build.0 = Debug|Any CPU
@ -44,6 +43,13 @@ Global
{6CF192C7-060B-4328-B9F4-B05BD7401232}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6CF192C7-060B-4328-B9F4-B05BD7401232}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6CF192C7-060B-4328-B9F4-B05BD7401232}.Release|Any CPU.Build.0 = Release|Any CPU
{155D6C93-9A46-45CD-B21F-0F60719515D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{155D6C93-9A46-45CD-B21F-0F60719515D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{155D6C93-9A46-45CD-B21F-0F60719515D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{155D6C93-9A46-45CD-B21F-0F60719515D0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{0E228A93-4B80-450E-BDBB-BAF576E6FC73} = {C0426B52-8F01-41DE-966C-ADC8DD078638}
@ -51,5 +57,6 @@ Global
{95F5B3F8-8293-43D0-98CF-88A9A224D20E} = {C0426B52-8F01-41DE-966C-ADC8DD078638}
{FE169440-2554-4810-97D6-CC44808AAA56} = {C0426B52-8F01-41DE-966C-ADC8DD078638}
{6CF192C7-060B-4328-B9F4-B05BD7401232} = {C0426B52-8F01-41DE-966C-ADC8DD078638}
{155D6C93-9A46-45CD-B21F-0F60719515D0} = {C0426B52-8F01-41DE-966C-ADC8DD078638}
EndGlobalSection
EndGlobal

1
src/CellularManagement.Presentation/DependencyInjectionExtensions.cs

@ -1 +0,0 @@

3
src/CellularManagement.WebAPI/CellularManagement.WebAPI.csproj

@ -5,6 +5,7 @@
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>
<ItemGroup>
@ -18,8 +19,10 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CellularManagement.Application\CellularManagement.Application.csproj" />
<ProjectReference Include="..\CellularManagement.Infrastructure\CellularManagement.Infrastructure.csproj" />
<ProjectReference Include="..\CellularManagement.Presentation\CellularManagement.Presentation.csproj" />
<ProjectReference Include="..\CellularManagement.WebSocket\CellularManagement.WebSocket.csproj" />
</ItemGroup>
<ItemGroup>

67
src/CellularManagement.WebAPI/Program.cs

@ -7,6 +7,10 @@ using Microsoft.Extensions.Hosting;
using CellularManagement.Infrastructure.Configurations;
using CellularManagement.Application;
using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Extensions;
using CellularManagement.WebSocket.Services;
using CellularManagement.WebSocket.Connection;
using System.Text.Json;
// 创建 Web 应用程序构建器
var builder = WebApplication.CreateBuilder(args);
@ -23,6 +27,16 @@ builder.Services.AddApplication();
// 包括控制器、视图、中间件等表现层相关服务
builder.Services.AddPresentation();
// 注册 WebSocket 服务
builder.Services.AddWebSocketServices(options =>
{
// 配置 WebSocket 选项
options.MaxConcurrentConnections = 2000; // 最大并发连接数
options.MaxMessageSize = 8192; // 最大消息大小(字节)
options.ConnectionTimeout = TimeSpan.FromMinutes(2); // 连接超时时间
options.HeartbeatInterval = TimeSpan.FromSeconds(15); // 心跳检测间隔
});
// 注册控制器
// 将 Presentation 层的控制器程序集添加到应用程序中
builder.Services.AddControllers()
@ -73,6 +87,13 @@ app.UseHttpsRedirection();
// 允许所有来源、所有请求头、所有 HTTP 方法和凭证
app.UseCors(x => x.AllowAnyHeader().SetIsOriginAllowed(p => true).AllowAnyMethod().AllowCredentials());
// 启用 WebSocket 中间件
app.UseWebSockets(new Microsoft.AspNetCore.Builder.WebSocketOptions
{
KeepAliveInterval = TimeSpan.FromSeconds(120)
});
app.UseWebSocketMiddleware();
// 启用授权中间件
// 处理用户认证和授权
app.UseAuthorization();
@ -84,5 +105,51 @@ app.UseDirectoryBrowser();
// 映射控制器路由
app.MapControllers();
// 获取 WebSocket 消息服务实例
var messageService = app.Services.GetRequiredService<WebSocketMessageService>();
// 注册消息处理器示例
messageService.RegisterMessageHandler("chat", async (message) =>
{
// 处理聊天消息
await Task.Delay(100); // 模拟异步处理
var response = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "chat",
payload = new
{
message = "收到聊天消息",
originalData = System.Text.Encoding.UTF8.GetString(message.Data)
}
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
return response;
});
messageService.RegisterMessageHandler("notification", async (message) =>
{
// 处理通知消息
await Task.Delay(100); // 模拟异步处理
var response = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "notification",
payload = new
{
message = "收到通知消息",
originalData = System.Text.Encoding.UTF8.GetString(message.Data)
}
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
return response;
});
// 运行应用程序
app.Run();

21
src/CellularManagement.WebSocket/CellularManagement.WebSocket.csproj

@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>CellularManagement.WebSocket</RootNamespace>
<AssemblyName>CellularManagement.WebSocket</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.2.1" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.2.2" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
</Project>

168
src/CellularManagement.WebSocket/Connection/WebSocketConnectionManager.cs

@ -0,0 +1,168 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Connection;
public class WebSocketConnectionManager : IDisposable
{
private readonly ConcurrentDictionary<string, WebSocketConnection> _connections = new();
private readonly Channel<WebSocketMessage> _incomingMessages;
private readonly Channel<WebSocketMessage> _outgoingMessages;
private readonly ILogger<WebSocketConnectionManager> _logger;
private readonly Timer _heartbeatTimer;
private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(30);
public WebSocketConnectionManager(ILogger<WebSocketConnectionManager> logger)
{
_logger = logger;
_incomingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
_outgoingMessages = Channel.CreateBounded<WebSocketMessage>(new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
_heartbeatTimer = new Timer(CheckConnections, null, _heartbeatInterval, _heartbeatInterval);
}
public string AddConnection(System.Net.WebSockets.WebSocket socket)
{
var connectionId = Guid.NewGuid().ToString();
var connection = new WebSocketConnection
{
Socket = socket,
LastActivityTime = DateTime.UtcNow,
Status = ConnectionStatus.Connected
};
_connections.TryAdd(connectionId, connection);
_logger.LogInformation("New connection added: {ConnectionId}", connectionId);
return connectionId;
}
public bool RemoveConnection(string connectionId)
{
if (_connections.TryRemove(connectionId, out var connection))
{
_logger.LogInformation("Connection removed: {ConnectionId}", connectionId);
return true;
}
return false;
}
public WebSocketConnection? GetConnection(string connectionId)
{
_connections.TryGetValue(connectionId, out var connection);
return connection;
}
public void UpdateConnectionActivity(string connectionId)
{
if (_connections.TryGetValue(connectionId, out var connection))
{
connection.LastActivityTime = DateTime.UtcNow;
}
}
public async ValueTask QueueIncomingMessage(WebSocketMessage message)
{
await _incomingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId);
}
public async ValueTask QueueOutgoingMessage(WebSocketMessage message)
{
await _outgoingMessages.Writer.WriteAsync(message);
UpdateConnectionActivity(message.ConnectionId);
}
public IAsyncEnumerable<WebSocketMessage> ReadIncomingMessagesAsync(CancellationToken cancellationToken)
{
return _incomingMessages.Reader.ReadAllAsync(cancellationToken);
}
public IAsyncEnumerable<WebSocketMessage> ReadOutgoingMessagesAsync(CancellationToken cancellationToken)
{
return _outgoingMessages.Reader.ReadAllAsync(cancellationToken);
}
public IEnumerable<WebSocketConnection> GetAllConnections()
{
return _connections.Values;
}
private void CheckConnections(object? state)
{
var now = DateTime.UtcNow;
var inactiveThreshold = TimeSpan.FromMinutes(1);
foreach (var (connectionId, connection) in _connections)
{
if (connection.Status == ConnectionStatus.Connected &&
now - connection.LastActivityTime > inactiveThreshold)
{
_logger.LogWarning("Connection {ConnectionId} is inactive, closing", connectionId);
_ = CloseConnectionAsync(connectionId);
}
}
}
private async Task CloseConnectionAsync(string connectionId)
{
if (_connections.TryGetValue(connectionId, out var connection))
{
try
{
await connection.Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Connection timeout",
CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error closing connection {ConnectionId}", connectionId);
}
finally
{
RemoveConnection(connectionId);
}
}
}
public void Dispose()
{
_heartbeatTimer.Dispose();
}
}
public class WebSocketConnection
{
public System.Net.WebSockets.WebSocket Socket { get; set; } = null!;
public DateTime LastActivityTime { get; set; }
public ConnectionStatus Status { get; set; }
}
public enum ConnectionStatus
{
Connected,
Disconnected,
Error
}
public record WebSocketMessage
{
public string ConnectionId { get; init; } = string.Empty;
public byte[] Data { get; init; } = Array.Empty<byte>();
public WebSocketMessageType MessageType { get; init; }
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public int Priority { get; init; } = 0;
}

36
src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs

@ -0,0 +1,36 @@
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Middleware;
using CellularManagement.WebSocket.Services;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
namespace CellularManagement.WebSocket.Extensions;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddWebSocketServices(this IServiceCollection services, Action<WebSocketOptions>? configureOptions = null)
{
services.AddSingleton<WebSocketConnectionManager>();
services.AddSingleton<WebSocketMessageService>();
if (configureOptions != null)
{
services.Configure(configureOptions);
}
return services;
}
public static IApplicationBuilder UseWebSocketMiddleware(this IApplicationBuilder app)
{
return app.UseMiddleware<WebSocketMiddleware>();
}
}
public class WebSocketOptions
{
public int MaxConcurrentConnections { get; set; } = 1000;
public int MaxMessageSize { get; set; } = 1024 * 4;
public TimeSpan ConnectionTimeout { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(30);
}

109
src/CellularManagement.WebSocket/Middleware/WebSocketMiddleware.cs

@ -0,0 +1,109 @@
using System.Net.WebSockets;
using CellularManagement.WebSocket.Connection;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Middleware;
public class WebSocketMiddleware
{
private readonly RequestDelegate _next;
private readonly WebSocketConnectionManager _connectionManager;
private readonly ILogger<WebSocketMiddleware> _logger;
private readonly int _bufferSize = 1024 * 4;
public WebSocketMiddleware(
RequestDelegate next,
WebSocketConnectionManager connectionManager,
ILogger<WebSocketMiddleware> logger)
{
_next = next;
_connectionManager = connectionManager;
_logger = logger;
}
public async Task InvokeAsync(HttpContext context)
{
if (context.WebSockets.IsWebSocketRequest)
{
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
var connectionId = _connectionManager.AddConnection(webSocket);
try
{
await HandleWebSocketConnection(webSocket, connectionId);
}
catch (WebSocketException ex)
{
_logger.LogError(ex, "WebSocket error for connection {ConnectionId}", connectionId);
await HandleWebSocketError(webSocket, ex);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling WebSocket connection {ConnectionId}", connectionId);
await HandleWebSocketError(webSocket, ex);
}
finally
{
_connectionManager.RemoveConnection(connectionId);
}
}
else
{
await _next(context);
}
}
private async Task HandleWebSocketConnection(System.Net.WebSockets.WebSocket webSocket, string connectionId)
{
var buffer = new byte[_bufferSize];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
while (!receiveResult.CloseStatus.HasValue)
{
try
{
var message = new WebSocketMessage
{
ConnectionId = connectionId,
Data = buffer.Take(receiveResult.Count).ToArray(),
MessageType = receiveResult.MessageType
};
await _connectionManager.QueueIncomingMessage(message);
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message for connection {ConnectionId}", connectionId);
throw;
}
}
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
}
private async Task HandleWebSocketError(System.Net.WebSockets.WebSocket webSocket, Exception exception)
{
try
{
if (webSocket.State == WebSocketState.Open)
{
await webSocket.CloseAsync(
WebSocketCloseStatus.InternalServerError,
"Internal server error",
CancellationToken.None);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error closing WebSocket after error");
}
}
}

19
src/CellularManagement.WebSocket/Pipeline/IPipelineStep.cs

@ -0,0 +1,19 @@
namespace CellularManagement.WebSocket.Pipeline;
public interface IPipelineStep<TInput, TOutput>
{
Task<TOutput> ProcessAsync(TInput input, CancellationToken cancellationToken);
Task<TOutput> ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken);
}
public interface IPipelineStep<TInput>
{
Task ProcessAsync(TInput input, CancellationToken cancellationToken);
Task ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken);
}
public class PipelineException : Exception
{
public PipelineException(string message) : base(message) { }
public PipelineException(string message, Exception innerException) : base(message, innerException) { }
}

73
src/CellularManagement.WebSocket/Pipeline/PipelineBuilder.cs

@ -0,0 +1,73 @@
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline;
public class PipelineBuilder<TInput, TOutput>
{
private readonly List<IPipelineStep<TInput, TOutput>> _steps = new();
private readonly ILoggerFactory _loggerFactory;
public PipelineBuilder(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
public PipelineBuilder<TInput, TOutput> AddStep(IPipelineStep<TInput, TOutput> step)
{
if (step == null)
{
throw new ArgumentNullException(nameof(step));
}
_steps.Add(step);
return this;
}
public IPipelineStep<TInput, TOutput> Build()
{
if (_steps.Count == 0)
{
throw new InvalidOperationException("Pipeline must have at least one step");
}
return new Pipeline<TInput, TOutput>(_steps, _loggerFactory.CreateLogger<Pipeline<TInput, TOutput>>());
}
}
public class Pipeline<TInput, TOutput> : IPipelineStep<TInput, TOutput>
{
private readonly IReadOnlyList<IPipelineStep<TInput, TOutput>> _steps;
private readonly ILogger<Pipeline<TInput, TOutput>> _logger;
public Pipeline(IReadOnlyList<IPipelineStep<TInput, TOutput>> steps, ILogger<Pipeline<TInput, TOutput>> logger)
{
_steps = steps;
_logger = logger;
}
public async Task<TOutput> ProcessAsync(TInput input, CancellationToken cancellationToken)
{
var current = input;
TOutput output = default!;
foreach (var step in _steps)
{
output = await step.ProcessAsync(current, cancellationToken);
current = (TInput)(object)output!;
}
return output;
}
public async Task<TOutput> ProcessWithErrorHandlingAsync(TInput input, CancellationToken cancellationToken)
{
try
{
return await ProcessAsync(input, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in pipeline processing");
throw new PipelineException("Pipeline processing failed", ex);
}
}
}

86
src/CellularManagement.WebSocket/Pipeline/Steps/MessageRoutingStep.cs

@ -0,0 +1,86 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline.Steps;
public class MessageRoutingStep : IPipelineStep<WebSocketMessage, WebSocketMessage>
{
private readonly ILogger<MessageRoutingStep> _logger;
private readonly Dictionary<string, List<Func<WebSocketMessage, Task<WebSocketMessage>>>> _handlers;
private readonly JsonSerializerOptions _jsonOptions;
private int _currentHandlerIndex = 0;
public MessageRoutingStep(ILogger<MessageRoutingStep> logger)
{
_logger = logger;
_handlers = new Dictionary<string, List<Func<WebSocketMessage, Task<WebSocketMessage>>>>();
_jsonOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
};
}
public void RegisterHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler)
{
if (!_handlers.ContainsKey(messageType))
{
_handlers[messageType] = new List<Func<WebSocketMessage, Task<WebSocketMessage>>>();
}
_handlers[messageType].Add(handler);
}
public async Task<WebSocketMessage> ProcessAsync(WebSocketMessage input, CancellationToken cancellationToken)
{
try
{
var messageText = System.Text.Encoding.UTF8.GetString(input.Data);
var message = JsonSerializer.Deserialize<Dictionary<string, object>>(messageText, _jsonOptions);
if (message == null || !message.TryGetValue("type", out var typeObj))
{
throw new InvalidOperationException("Message type not found");
}
var messageType = typeObj.ToString();
if (string.IsNullOrEmpty(messageType))
{
throw new InvalidOperationException("Invalid message type");
}
if (!_handlers.TryGetValue(messageType, out var handlers) || handlers.Count == 0)
{
throw new InvalidOperationException($"No handler registered for message type: {messageType}");
}
// 使用轮询策略选择处理器
var handler = GetNextHandler(handlers);
return await handler(input);
}
catch (Exception ex)
{
_logger.LogError(ex, "Message routing failed for connection {ConnectionId}", input.ConnectionId);
throw;
}
}
public async Task<WebSocketMessage> ProcessWithErrorHandlingAsync(WebSocketMessage input, CancellationToken cancellationToken)
{
try
{
return await ProcessAsync(input, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in message routing for connection {ConnectionId}", input.ConnectionId);
throw new PipelineException("Message routing failed", ex);
}
}
private Func<WebSocketMessage, Task<WebSocketMessage>> GetNextHandler(List<Func<WebSocketMessage, Task<WebSocketMessage>>> handlers)
{
// 使用轮询策略选择处理器
var index = Interlocked.Increment(ref _currentHandlerIndex) % handlers.Count;
return handlers[index];
}
}

84
src/CellularManagement.WebSocket/Pipeline/Steps/MessageValidationStep.cs

@ -0,0 +1,84 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Pipeline.Steps;
public class MessageValidationStep : IPipelineStep<WebSocketMessage, WebSocketMessage>
{
private readonly ILogger<MessageValidationStep> _logger;
private readonly JsonSerializerOptions _jsonOptions;
public MessageValidationStep(ILogger<MessageValidationStep> logger)
{
_logger = logger;
_jsonOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
};
}
public Task<WebSocketMessage> ProcessAsync(WebSocketMessage input, CancellationToken cancellationToken)
{
try
{
ValidateMessage(input);
return Task.FromResult(input);
}
catch (Exception ex)
{
_logger.LogError(ex, "Message validation failed for connection {ConnectionId}", input.ConnectionId);
throw;
}
}
public async Task<WebSocketMessage> ProcessWithErrorHandlingAsync(WebSocketMessage input, CancellationToken cancellationToken)
{
try
{
return await ProcessAsync(input, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in message validation for connection {ConnectionId}", input.ConnectionId);
throw new PipelineException("Message validation failed", ex);
}
}
private void ValidateMessage(WebSocketMessage message)
{
if (message.Data.Length == 0)
{
throw new InvalidOperationException("Message data is empty");
}
if (message.MessageType != System.Net.WebSockets.WebSocketMessageType.Text)
{
throw new InvalidOperationException("Only text messages are supported");
}
var messageText = System.Text.Encoding.UTF8.GetString(message.Data);
var messageObject = JsonSerializer.Deserialize<Dictionary<string, object>>(messageText, _jsonOptions);
if (messageObject == null)
{
throw new InvalidOperationException("Invalid JSON format");
}
if (!messageObject.ContainsKey("type"))
{
throw new InvalidOperationException("Message type is required");
}
if (!messageObject.ContainsKey("payload"))
{
throw new InvalidOperationException("Message payload is required");
}
var type = messageObject["type"].ToString();
if (string.IsNullOrWhiteSpace(type))
{
throw new InvalidOperationException("Message type cannot be empty");
}
}
}

108
src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

@ -0,0 +1,108 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Pipeline;
using CellularManagement.WebSocket.Pipeline.Steps;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services;
public class WebSocketMessageService : BackgroundService
{
private readonly WebSocketConnectionManager _connectionManager;
private readonly ILogger<WebSocketMessageService> _logger;
private readonly IPipelineStep<WebSocketMessage, WebSocketMessage> _pipeline;
private readonly MessageRoutingStep _routingStep;
private readonly SemaphoreSlim _processingSemaphore;
private readonly int _maxConcurrentProcesses = 10;
public WebSocketMessageService(
WebSocketConnectionManager connectionManager,
ILogger<WebSocketMessageService> logger,
ILoggerFactory loggerFactory)
{
_connectionManager = connectionManager;
_logger = logger;
_routingStep = new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>());
_processingSemaphore = new SemaphoreSlim(_maxConcurrentProcesses);
// 构建处理管道
var pipelineBuilder = new PipelineBuilder<WebSocketMessage, WebSocketMessage>(loggerFactory);
_pipeline = pipelineBuilder
.AddStep(new MessageValidationStep(loggerFactory.CreateLogger<MessageValidationStep>()))
.AddStep(_routingStep)
.Build();
}
public void RegisterMessageHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler)
{
_routingStep.RegisterHandler(messageType, handler);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await foreach (var message in _connectionManager.ReadIncomingMessagesAsync(stoppingToken))
{
await ProcessMessageAsync(message, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Message processing service is stopping");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in message processing service");
}
}
private async Task ProcessMessageAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
await _processingSemaphore.WaitAsync(cancellationToken);
try
{
var processedMessage = await _pipeline.ProcessWithErrorHandlingAsync(message, cancellationToken);
await _connectionManager.QueueOutgoingMessage(processedMessage);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message for connection {ConnectionId}", message.ConnectionId);
await HandleProcessingError(message, ex);
}
finally
{
_processingSemaphore.Release();
}
}
private async Task HandleProcessingError(WebSocketMessage message, Exception exception)
{
try
{
var errorResponse = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "error",
error = exception.Message
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
await _connectionManager.QueueOutgoingMessage(errorResponse);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error sending error response for connection {ConnectionId}", message.ConnectionId);
}
}
public override void Dispose()
{
_processingSemaphore.Dispose();
base.Dispose();
}
}
Loading…
Cancel
Save