Browse Source

feat: 添加 WebSocket 消息处理功能

norm
hyh 2 months ago
parent
commit
5227ebfcf4
  1. 1
      src/CellularManagement.Application/Features/Auth/Commands/Logout/LogoutCommandHandler.cs
  2. 63
      src/CellularManagement.WebAPI/Program.cs
  3. 4
      src/CellularManagement.WebAPI/appsettings.Development.json
  4. 4
      src/CellularManagement.WebAPI/appsettings.json
  5. 7
      src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs
  6. 45
      src/CellularManagement.WebSocket/Services/Handlers/ChatMessageHandler.cs
  7. 45
      src/CellularManagement.WebSocket/Services/Handlers/NotificationMessageHandler.cs
  8. 21
      src/CellularManagement.WebSocket/Services/IWebSocketMessageHandler.cs
  9. 21
      src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

1
src/CellularManagement.Application/Features/Auth/Commands/Logout/LogoutCommandHandler.cs

@ -232,6 +232,7 @@ public class LogoutCommandHandler : IRequestHandler<LogoutCommand, OperationResu
sw.Stop();
_logger.LogDebug("[{CorrelationId}] {TokenType}处理完成,耗时: {ElapsedMs}ms",
correlationId, tokenType, sw.ElapsedMilliseconds);
await Task.CompletedTask;
}
catch (Exception ex)
{

63
src/CellularManagement.WebAPI/Program.cs

@ -10,6 +10,7 @@ using Microsoft.Extensions.Options;
using CellularManagement.WebSocket.Extensions;
using CellularManagement.WebSocket.Services;
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Services.Handlers;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.OpenApi.Models;
@ -58,6 +59,7 @@ builder.Services.AddWebSocketServices(options =>
options.HeartbeatInterval = TimeSpan.FromSeconds(15); // 心跳检测间隔
});
// 注册控制器
// 将 Presentation 层的控制器程序集添加到应用程序中
builder.Services.AddControllers()
@ -205,66 +207,5 @@ app.UseDirectoryBrowser();
// 映射控制器路由
app.MapControllers();
// 获取 WebSocket 消息服务实例
var messageService = app.Services.GetRequiredService<WebSocketMessageService>();
var _logger = app.Services.GetRequiredService<ILogger<Program>>();
// 注册消息处理器示例
messageService.RegisterMessageHandler("chat", async (message) =>
{
_logger.LogInformation("开始处理聊天消息,连接ID:{ConnectionId}", message.ConnectionId);
_logger.LogDebug("原始消息数据:{Data}", System.Text.Encoding.UTF8.GetString(message.Data));
await Task.Delay(100); // 模拟异步处理
var response = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "chat",
payload = new
{
message = "收到聊天消息",
originalData = System.Text.Encoding.UTF8.GetString(message.Data),
timestamp = DateTime.UtcNow
}
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
_logger.LogInformation("聊天消息处理完成,连接ID:{ConnectionId},响应数据大小:{DataSize}字节",
message.ConnectionId, response.Data.Length);
return response;
});
messageService.RegisterMessageHandler("notification", async (message) =>
{
_logger.LogInformation("开始处理通知消息,连接ID:{ConnectionId}", message.ConnectionId);
_logger.LogDebug("原始消息数据:{Data}", System.Text.Encoding.UTF8.GetString(message.Data));
await Task.Delay(100); // 模拟异步处理
var response = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "notification",
payload = new
{
message = "收到通知消息",
originalData = System.Text.Encoding.UTF8.GetString(message.Data),
timestamp = DateTime.UtcNow
}
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
_logger.LogInformation("通知消息处理完成,连接ID:{ConnectionId},响应数据大小:{DataSize}字节",
message.ConnectionId, response.Data.Length);
return response;
});
// 运行应用程序
app.Run();

4
src/CellularManagement.WebAPI/appsettings.Development.json

@ -1,8 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
"Default": "Trace",
"Microsoft.AspNetCore": "Information"
}
}
}

4
src/CellularManagement.WebAPI/appsettings.json

@ -1,8 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
"Default": "Trace", //Information
"Microsoft.AspNetCore": "Information" //Information Warning Debug Trace
}
},
"DatabaseOptions": {

7
src/CellularManagement.WebSocket/Extensions/ServiceCollectionExtensions.cs

@ -1,6 +1,7 @@
using CellularManagement.WebSocket.Connection;
using CellularManagement.WebSocket.Middleware;
using CellularManagement.WebSocket.Services;
using CellularManagement.WebSocket.Services.Handlers;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
@ -11,8 +12,10 @@ public static class ServiceCollectionExtensions
public static IServiceCollection AddWebSocketServices(this IServiceCollection services, Action<WebSocketOptions>? configureOptions = null)
{
services.AddSingleton<WebSocketConnectionManager>();
services.AddSingleton<WebSocketMessageService>();
// 注册 WebSocket 消息处理器为 Singleton
services.AddSingleton<IWebSocketMessageHandler, ChatMessageHandler>();
services.AddSingleton<IWebSocketMessageHandler, NotificationMessageHandler>();
services.AddHostedService<WebSocketMessageService>();
if (configureOptions != null)
{
services.Configure(configureOptions);

45
src/CellularManagement.WebSocket/Services/Handlers/ChatMessageHandler.cs

@ -0,0 +1,45 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services.Handlers;
public class ChatMessageHandler : IWebSocketMessageHandler
{
private readonly ILogger<ChatMessageHandler> _logger;
public ChatMessageHandler(ILogger<ChatMessageHandler> logger)
{
_logger = logger;
}
public string MessageType => "chat";
public async Task<WebSocketMessage> HandleAsync(WebSocketMessage message)
{
_logger.LogInformation("开始处理聊天消息,连接ID:{ConnectionId}", message.ConnectionId);
_logger.LogDebug("原始消息数据:{Data}", System.Text.Encoding.UTF8.GetString(message.Data));
await Task.Delay(100); // 模拟异步处理
var response = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "chat",
payload = new
{
message = "收到聊天消息",
originalData = System.Text.Encoding.UTF8.GetString(message.Data),
timestamp = DateTime.UtcNow
}
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
_logger.LogInformation("聊天消息处理完成,连接ID:{ConnectionId},响应数据大小:{DataSize}字节",
message.ConnectionId, response.Data.Length);
return response;
}
}

45
src/CellularManagement.WebSocket/Services/Handlers/NotificationMessageHandler.cs

@ -0,0 +1,45 @@
using System.Text.Json;
using CellularManagement.WebSocket.Connection;
using Microsoft.Extensions.Logging;
namespace CellularManagement.WebSocket.Services.Handlers;
public class NotificationMessageHandler : IWebSocketMessageHandler
{
private readonly ILogger<NotificationMessageHandler> _logger;
public NotificationMessageHandler(ILogger<NotificationMessageHandler> logger)
{
_logger = logger;
}
public string MessageType => "notification";
public async Task<WebSocketMessage> HandleAsync(WebSocketMessage message)
{
_logger.LogInformation("开始处理通知消息,连接ID:{ConnectionId}", message.ConnectionId);
_logger.LogDebug("原始消息数据:{Data}", System.Text.Encoding.UTF8.GetString(message.Data));
await Task.Delay(100); // 模拟异步处理
var response = new WebSocketMessage
{
ConnectionId = message.ConnectionId,
Data = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
type = "notification",
payload = new
{
message = "收到通知消息",
originalData = System.Text.Encoding.UTF8.GetString(message.Data),
timestamp = DateTime.UtcNow
}
})),
MessageType = System.Net.WebSockets.WebSocketMessageType.Text
};
_logger.LogInformation("通知消息处理完成,连接ID:{ConnectionId},响应数据大小:{DataSize}字节",
message.ConnectionId, response.Data.Length);
return response;
}
}

21
src/CellularManagement.WebSocket/Services/IWebSocketMessageHandler.cs

@ -0,0 +1,21 @@
using CellularManagement.WebSocket.Connection;
namespace CellularManagement.WebSocket.Services;
/// <summary>
/// WebSocket 消息处理器接口
/// </summary>
public interface IWebSocketMessageHandler
{
/// <summary>
/// 获取处理器支持的消息类型
/// </summary>
string MessageType { get; }
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message">WebSocket 消息</param>
/// <returns>处理后的消息</returns>
Task<WebSocketMessage> HandleAsync(WebSocketMessage message);
}

21
src/CellularManagement.WebSocket/Services/WebSocketMessageService.cs

@ -19,14 +19,17 @@ public class WebSocketMessageService : BackgroundService
private readonly MessageRoutingStep _routingStep;
private readonly SemaphoreSlim _processingSemaphore;
private readonly int _maxConcurrentProcesses = 10;
private readonly IEnumerable<IWebSocketMessageHandler> _messageHandlers;
public WebSocketMessageService(
WebSocketConnectionManager connectionManager,
ILogger<WebSocketMessageService> logger,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
IEnumerable<IWebSocketMessageHandler> messageHandlers)
{
_connectionManager = connectionManager;
_logger = logger;
_messageHandlers = messageHandlers;
_logger.LogInformation("初始化 WebSocket 消息服务");
_routingStep = new MessageRoutingStep(loggerFactory.CreateLogger<MessageRoutingStep>());
@ -41,18 +44,21 @@ public class WebSocketMessageService : BackgroundService
.Build();
_logger.LogInformation("消息处理管道构建完成");
// 注册消息处理器
RegisterMessageHandlers();
// 订阅消息到达事件
_connectionManager.OnMessageReceived += ProcessMessageAsync;
_logger.LogInformation("已订阅消息到达事件");
}
/// <summary>
/// 注册消息处理器
/// </summary>
public void RegisterMessageHandler(string messageType, Func<WebSocketMessage, Task<WebSocketMessage>> handler)
private void RegisterMessageHandlers()
{
_logger.LogInformation("注册消息处理器,消息类型:{MessageType}", messageType);
_routingStep.RegisterHandler(messageType, handler);
foreach (var handler in _messageHandlers)
{
_logger.LogInformation("注册消息处理器,消息类型:{MessageType}", handler.MessageType);
_routingStep.RegisterHandler(handler.MessageType, handler.HandleAsync);
}
}
/// <summary>
@ -62,6 +68,7 @@ public class WebSocketMessageService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("WebSocket 消息服务开始运行");
try
{
// 处理出站消息

Loading…
Cancel
Save