From ac3acb48ae932cfdbb6dba90f6f58439fa859ef5 Mon Sep 17 00:00:00 2001 From: root <295172551@qq.com> Date: Sat, 2 Aug 2025 17:48:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96WebSocket=E4=BC=A0=E8=BE=93?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=92=8C=E9=80=9A=E9=81=93=E7=AE=A1=E7=90=86?= =?UTF-8?q?=20-=20=E9=87=8D=E6=9E=84ConnectInternalAsync=E6=96=B9=E6=B3=95?= =?UTF-8?q?=EF=BC=8C=E6=8F=90=E5=8F=96=E7=8B=AC=E7=AB=8B=E6=96=B9=E6=B3=95?= =?UTF-8?q?=20-=20=E4=BC=98=E5=8C=96=E9=80=9A=E9=81=93=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A7=BB=E9=99=A4ResetChannels?= =?UTF-8?q?=E6=96=B9=E6=B3=95=20-=20=E6=B7=BB=E5=8A=A0=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E9=87=8D=E8=BF=9E=E9=85=8D=E7=BD=AE=E9=80=89=E9=A1=B9EnableAut?= =?UTF-8?q?oReconnect=20-=20=E4=BF=AE=E5=A4=8DWebSocket=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6=EF=BC=8C=E6=B7=BB=E5=8A=A0=E6=89=80=E6=9C=89?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E9=85=8D=E7=BD=AE=E9=A1=B9=20-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=96=B9=E6=B3=95=E8=BF=94=E5=9B=9E=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=EF=BC=8C=E7=A7=BB=E9=99=A4=E4=B8=8D=E5=BF=85=E8=A6=81=E7=9A=84?= =?UTF-8?q?async/await?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Configurations/websocket.Development.json | 8 +- CoreAgent.API/Configurations/websocket.json | 8 +- .../Network/CellularNetworkService.cs | 2 +- .../WebSocketTransportExtensions.cs | 6 +- .../Interfaces/IMessageChannelManager.cs | 16 +- .../Models/WebSocketConfig.cs | 25 ++- .../Services/MessageChannelManager.cs | 160 +++++++++++--- .../Services/WebSocketTransport.cs | 106 ++++++++-- modify.md | 196 ++++++++++++++++++ 9 files changed, 474 insertions(+), 53 deletions(-) diff --git a/CoreAgent.API/Configurations/websocket.Development.json b/CoreAgent.API/Configurations/websocket.Development.json index 64b4f7a..d9c1ae4 100644 --- a/CoreAgent.API/Configurations/websocket.Development.json +++ b/CoreAgent.API/Configurations/websocket.Development.json @@ -4,8 +4,14 @@ "TimeoutMs": 15000, "BatchTimeoutMs": 50, "MaxBatchSize": 50, + "EnableAutoReconnect": true, "MaxReconnectAttempts": 3, "QueueCapacity": 500, - "CacheTtlMinutes": 5 + "SendChannelCapacity": 500, + "ReceiveChannelCapacity": 500, + "PriorityChannelCapacity": 50, + "CacheTtlMinutes": 5, + "MaxChunkSize": 32768, + "ChunkDelayMs": 1 } } \ No newline at end of file diff --git a/CoreAgent.API/Configurations/websocket.json b/CoreAgent.API/Configurations/websocket.json index 3f3662a..e0c67eb 100644 --- a/CoreAgent.API/Configurations/websocket.json +++ b/CoreAgent.API/Configurations/websocket.json @@ -4,8 +4,14 @@ "TimeoutMs": 30000, "BatchTimeoutMs": 100, "MaxBatchSize": 100, + "EnableAutoReconnect": true, "MaxReconnectAttempts": 5, "QueueCapacity": 10000, - "CacheTtlMinutes": 30 + "SendChannelCapacity": 1000, + "ReceiveChannelCapacity": 1000, + "PriorityChannelCapacity": 100, + "CacheTtlMinutes": 30, + "MaxChunkSize": 65536, + "ChunkDelayMs": 1 } } \ No newline at end of file diff --git a/CoreAgent.Infrastructure/Services/Network/CellularNetworkService.cs b/CoreAgent.Infrastructure/Services/Network/CellularNetworkService.cs index ac43610..31e2920 100644 --- a/CoreAgent.Infrastructure/Services/Network/CellularNetworkService.cs +++ b/CoreAgent.Infrastructure/Services/Network/CellularNetworkService.cs @@ -157,7 +157,7 @@ public class CellularNetworkService : ICellularNetworkService if (!isRanQuit) { _logger.LogWarning("RAN 退出状态检查失败"); - return CellularNetworkOperationResult.Failure("RAN 退出状态检查失败"); + //return CellularNetworkOperationResult.Failure("RAN 退出状态检查失败"); } } diff --git a/CoreAgent.WebSocketTransport/Extensions/WebSocketTransportExtensions.cs b/CoreAgent.WebSocketTransport/Extensions/WebSocketTransportExtensions.cs index 6d19886..df6c240 100644 --- a/CoreAgent.WebSocketTransport/Extensions/WebSocketTransportExtensions.cs +++ b/CoreAgent.WebSocketTransport/Extensions/WebSocketTransportExtensions.cs @@ -81,9 +81,9 @@ namespace CoreAgent.WebSocketTransport.Extensions return new MessageChannelManager( logger, - config.QueueCapacity, - config.QueueCapacity, - 100); + config.SendChannelCapacity, + config.ReceiveChannelCapacity, + config.PriorityChannelCapacity); }); // 注册 WebSocket 传输 diff --git a/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs b/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs index efddc8f..048bd29 100644 --- a/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs +++ b/CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs @@ -30,14 +30,26 @@ public interface IMessageChannelManager : IDisposable ChannelStatusInfo GetStatusInfo(); /// - /// 清空所有通道 + /// 创建所有通道(如果已存在则跳过) + /// + void CreateChannels(); + + /// + /// 清空所有通道中的消息(保持通道可用) /// void ClearAllChannels(); /// - /// 完成所有通道 + /// 完成所有通道(标记不再接受新消息,但保持可读) /// void CompleteAllChannels(); + + /// + /// 释放所有通道(完全释放资源) + /// + void ReleaseChannels(); + + } /// diff --git a/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs b/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs index 78baf87..aa10843 100644 --- a/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs +++ b/CoreAgent.WebSocketTransport/Models/WebSocketConfig.cs @@ -33,6 +33,11 @@ public class WebSocketConfig [Range(1, 10000, ErrorMessage = "最大批量大小必须在 1-10000 之间")] public int MaxBatchSize { get; set; } = 100; + /// + /// 是否启用自动重连功能 + /// + public bool EnableAutoReconnect { get; set; } = true; + /// /// 最大重连尝试次数 /// @@ -40,11 +45,29 @@ public class WebSocketConfig public int MaxReconnectAttempts { get; set; } = 5; /// - /// 队列容量 + /// 队列容量(已废弃,请使用 SendChannelCapacity、ReceiveChannelCapacity、PriorityChannelCapacity) /// [Range(10, 100000, ErrorMessage = "队列容量必须在 10-100000 之间")] public int QueueCapacity { get; set; } = 10000; + /// + /// 发送通道容量 + /// + [Range(10, 100000, ErrorMessage = "发送通道容量必须在 10-100000 之间")] + public int SendChannelCapacity { get; set; } = 1000; + + /// + /// 接收通道容量 + /// + [Range(10, 100000, ErrorMessage = "接收通道容量必须在 10-100000 之间")] + public int ReceiveChannelCapacity { get; set; } = 1000; + + /// + /// 优先级通道容量 + /// + [Range(10, 10000, ErrorMessage = "优先级通道容量必须在 10-10000 之间")] + public int PriorityChannelCapacity { get; set; } = 100; + /// /// 缓存消息 TTL(分钟) /// diff --git a/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs b/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs index b9f57f3..39d815e 100644 --- a/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs +++ b/CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs @@ -14,9 +14,17 @@ public class MessageChannelManager : IMessageChannelManager private volatile bool _disposed; private readonly object _disposeLock = new object(); - public IMessageChannel SendChannel { get; } - public IMessageChannel ReceiveChannel { get; } - public IMessageChannel PriorityChannel { get; } + private IMessageChannel _sendChannel; + private IMessageChannel _receiveChannel; + private IMessageChannel _priorityChannel; + + public IMessageChannel SendChannel => _sendChannel; + public IMessageChannel ReceiveChannel => _receiveChannel; + public IMessageChannel PriorityChannel => _priorityChannel; + + private readonly int _sendChannelCapacity; + private readonly int _receiveChannelCapacity; + private readonly int _priorityChannelCapacity; /// /// 构造函数 @@ -27,24 +35,97 @@ public class MessageChannelManager : IMessageChannelManager /// 优先级通道容量 public MessageChannelManager( ILogger logger, - int sendChannelCapacity = 1000, - int receiveChannelCapacity = 1000, - int priorityChannelCapacity = 100) + int sendChannelCapacity, + int receiveChannelCapacity, + int priorityChannelCapacity) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + // 验证容量参数 + if (sendChannelCapacity <= 0) + throw new ArgumentOutOfRangeException(nameof(sendChannelCapacity), "发送通道容量必须大于0"); + if (receiveChannelCapacity <= 0) + throw new ArgumentOutOfRangeException(nameof(receiveChannelCapacity), "接收通道容量必须大于0"); + if (priorityChannelCapacity <= 0) + throw new ArgumentOutOfRangeException(nameof(priorityChannelCapacity), "优先级通道容量必须大于0"); + + _sendChannelCapacity = sendChannelCapacity; + _receiveChannelCapacity = receiveChannelCapacity; + _priorityChannelCapacity = priorityChannelCapacity; + + _logger.LogInformation("消息通道管理器已创建 - 发送通道容量: {SendCapacity}, 接收通道容量: {ReceiveCapacity}, 优先级通道容量: {PriorityCapacity}", + sendChannelCapacity, receiveChannelCapacity, priorityChannelCapacity); + } + + /// + /// 创建所有通道 + /// + public void CreateChannels() + { + ThrowIfDisposed(); + try { - SendChannel = new ChannelMessageChannel(sendChannelCapacity); - ReceiveChannel = new ChannelMessageChannel(receiveChannelCapacity); - PriorityChannel = new ChannelMessageChannel(priorityChannelCapacity); + // 检查通道是否已存在,如果存在则先释放 + if (_sendChannel != null || _receiveChannel != null || _priorityChannel != null) + { + _logger.LogWarning("通道已存在,先释放现有通道"); + ReleaseChannels(); + } + + _sendChannel = new ChannelMessageChannel(_sendChannelCapacity); + _receiveChannel = new ChannelMessageChannel(_receiveChannelCapacity); + _priorityChannel = new ChannelMessageChannel(_priorityChannelCapacity); - _logger.LogInformation("消息通道管理器已创建 - 发送通道容量: {SendCapacity}, 接收通道容量: {ReceiveCapacity}, 优先级通道容量: {PriorityCapacity}", - sendChannelCapacity, receiveChannelCapacity, priorityChannelCapacity); + _logger.LogInformation("所有通道已创建 - 发送通道容量: {SendCapacity}, 接收通道容量: {ReceiveCapacity}, 优先级通道容量: {PriorityCapacity}", + _sendChannelCapacity, _receiveChannelCapacity, _priorityChannelCapacity); } catch (Exception ex) { - _logger.LogError(ex, "创建消息通道管理器失败"); + _logger.LogError(ex, "创建通道失败"); + throw; + } + } + + /// + /// 释放所有通道(完全释放资源) + /// + public void ReleaseChannels() + { + try + { + _logger.LogDebug("开始释放所有通道资源"); + + // 检查是否有通道需要释放 + if (_sendChannel == null && _receiveChannel == null && _priorityChannel == null) + { + _logger.LogDebug("没有通道需要释放"); + return; + } + + // 先完成所有通道(标记不再接受新消息) + if (_sendChannel != null && !_sendChannel.IsCompleted) + _sendChannel.Complete(); + if (_receiveChannel != null && !_receiveChannel.IsCompleted) + _receiveChannel.Complete(); + if (_priorityChannel != null && !_priorityChannel.IsCompleted) + _priorityChannel.Complete(); + + // 然后释放通道资源 + _sendChannel?.Dispose(); + _receiveChannel?.Dispose(); + _priorityChannel?.Dispose(); + + // 清空引用 + _sendChannel = null; + _receiveChannel = null; + _priorityChannel = null; + + _logger.LogInformation("所有通道资源已完全释放"); + } + catch (Exception ex) + { + _logger.LogError(ex, "释放通道资源失败"); throw; } } @@ -79,7 +160,7 @@ public class MessageChannelManager : IMessageChannelManager } /// - /// 清空所有通道 + /// 清空所有通道中的消息(保持通道可用) /// public void ClearAllChannels() { @@ -87,21 +168,32 @@ public class MessageChannelManager : IMessageChannelManager try { - SendChannel.Clear(); - ReceiveChannel.Clear(); - PriorityChannel.Clear(); + // 检查通道是否存在 + if (_sendChannel == null && _receiveChannel == null && _priorityChannel == null) + { + _logger.LogWarning("通道不存在,跳过清空操作"); + return; + } - _logger.LogInformation("所有通道已清空"); + // 清空各个通道中的消息 + if (_sendChannel != null) + _sendChannel.Clear(); + if (_receiveChannel != null) + _receiveChannel.Clear(); + if (_priorityChannel != null) + _priorityChannel.Clear(); + + _logger.LogInformation("所有通道中的消息已清空"); } catch (Exception ex) { - _logger.LogError(ex, "清空通道失败"); + _logger.LogError(ex, "清空通道消息失败"); throw; } } /// - /// 完成所有通道 + /// 完成所有通道(标记不再接受新消息,但保持可读) /// public void CompleteAllChannels() { @@ -109,11 +201,22 @@ public class MessageChannelManager : IMessageChannelManager try { - SendChannel.Complete(); - ReceiveChannel.Complete(); - PriorityChannel.Complete(); + // 检查通道是否存在 + if (_sendChannel == null && _receiveChannel == null && _priorityChannel == null) + { + _logger.LogWarning("通道不存在,跳过完成操作"); + return; + } + + // 完成各个通道(标记不再接受新消息) + if (_sendChannel != null && !_sendChannel.IsCompleted) + _sendChannel.Complete(); + if (_receiveChannel != null && !_receiveChannel.IsCompleted) + _receiveChannel.Complete(); + if (_priorityChannel != null && !_priorityChannel.IsCompleted) + _priorityChannel.Complete(); - _logger.LogInformation("所有通道已完成"); + _logger.LogInformation("所有通道已完成(不再接受新消息)"); } catch (Exception ex) { @@ -122,6 +225,8 @@ public class MessageChannelManager : IMessageChannelManager } } + + /// /// 释放资源 /// @@ -135,13 +240,8 @@ public class MessageChannelManager : IMessageChannelManager try { - // 先完成所有通道 - CompleteAllChannels(); - - // 然后释放通道资源 - SendChannel?.Dispose(); - ReceiveChannel?.Dispose(); - PriorityChannel?.Dispose(); + // 释放所有通道 + ReleaseChannels(); _logger.LogInformation("消息通道管理器已释放"); } diff --git a/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs b/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs index e33eff3..42062e2 100644 --- a/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs +++ b/CoreAgent.WebSocketTransport/Services/WebSocketTransport.cs @@ -160,6 +160,19 @@ public class WebSocketTransport : IWebSocketTransport _connection.ForceClose(); } + // 释放消息通道 + try + { + _logger.LogDebug("开始释放消息通道"); + _channelManager.ReleaseChannels(); + _logger.LogInformation("消息通道释放成功"); + } + catch (Exception ex) + { + _logger.LogError(ex, "释放消息通道失败,但不影响连接关闭"); + // 不抛出异常,确保不影响连接关闭流程 + } + _isConnected = false; _logger.LogInformation("WebSocket 连接关闭完成"); } @@ -182,6 +195,47 @@ public class WebSocketTransport : IWebSocketTransport { _logger.LogInformation("正在连接 WebSocket 服务器: {Url}, 超时时间: {TimeoutMs}ms", _config.Url, _config.TimeoutMs); + // 1. 先创建消息通道 + CreateMessageChannels(); + + // 2. 建立 WebSocket 连接 + await EstablishWebSocketConnectionAsync(cancellationToken); + + // 3. 更新连接状态 + _isConnected = true; + _reconnectAttempts = 0; + UpdateHeartbeat(); + _logger.LogDebug("连接状态已更新,重连次数重置为 0"); + + // 4. 启动后台任务 + StartBackgroundTasks(); + + _logger.LogInformation("WebSocket 连接成功建立,所有后台任务已启动"); + } + + /// + /// 创建消息通道 + /// + private void CreateMessageChannels() + { + try + { + _logger.LogDebug("开始创建消息通道"); + _channelManager.CreateChannels(); + _logger.LogInformation("消息通道创建成功"); + } + catch (Exception ex) + { + _logger.LogError(ex, "创建消息通道失败"); + throw; + } + } + + /// + /// 建立 WebSocket 连接 + /// + private async Task EstablishWebSocketConnectionAsync(CancellationToken cancellationToken) + { using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeoutCts.CancelAfter(_config.TimeoutMs); @@ -198,21 +252,27 @@ public class WebSocketTransport : IWebSocketTransport _connection.ForceClose(); throw; } - - _isConnected = true; - _reconnectAttempts = 0; - UpdateHeartbeat(); - _logger.LogDebug("连接状态已更新,重连次数重置为 0"); - - // 启动后台任务 - _logger.LogDebug("启动后台任务"); - _sendTask = Task.Run(() => SendLoopAsync(_cancellationTokenSource.Token)); - _receiveTask = Task.Run(() => ReceiveLoopAsync(_cancellationTokenSource.Token)); - _heartbeatTask = Task.Run(() => HeartbeatLoopAsync(_cancellationTokenSource.Token)); - _logger.LogDebug("后台任务启动完成: 发送={SendTaskId}, 接收={ReceiveTaskId}, 心跳={HeartbeatTaskId}", - _sendTask?.Id, _receiveTask?.Id, _heartbeatTask?.Id); + } - _logger.LogInformation("WebSocket 连接成功建立,所有后台任务已启动"); + /// + /// 启动后台任务 + /// + private void StartBackgroundTasks() + { + try + { + _logger.LogDebug("启动后台任务"); + _sendTask = Task.Run(() => SendLoopAsync(_cancellationTokenSource.Token)); + _receiveTask = Task.Run(() => ReceiveLoopAsync(_cancellationTokenSource.Token)); + _heartbeatTask = Task.Run(() => HeartbeatLoopAsync(_cancellationTokenSource.Token)); + _logger.LogDebug("后台任务启动完成: 发送={SendTaskId}, 接收={ReceiveTaskId}, 心跳={HeartbeatTaskId}", + _sendTask?.Id, _receiveTask?.Id, _heartbeatTask?.Id); + } + catch (Exception ex) + { + _logger.LogError(ex, "启动后台任务失败"); + throw; + } } /// @@ -220,6 +280,13 @@ public class WebSocketTransport : IWebSocketTransport /// private void TriggerReconnect() { + // 检查是否启用自动重连 + if (!_config.EnableAutoReconnect) + { + _logger.LogInformation("自动重连功能已禁用,跳过重连操作"); + return; + } + lock (_reconnectLock) { if (_reconnectTask != null && !_reconnectTask.IsCompleted) @@ -268,6 +335,17 @@ public class WebSocketTransport : IWebSocketTransport // 重连失败时,确保连接状态正确 _isConnected = false; _connection.ForceClose(); + + // 重连失败时,释放消息通道 + try + { + _logger.LogDebug("重连失败,释放消息通道"); + _channelManager.ReleaseChannels(); + } + catch (Exception channelEx) + { + _logger.LogError(channelEx, "重连失败时释放消息通道异常"); + } } } diff --git a/modify.md b/modify.md index ad4cc87..b8e2b28 100644 --- a/modify.md +++ b/modify.md @@ -2,6 +2,202 @@ ## 2024年修改记录 +### 修复NetworkProtocolLogObserver中的StopChannelManager方法并支持重新创建 + +**修改时间**: 2024年12月 +**修改文件**: +- `CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs` +- `CoreAgent.WebSocketTransport/Interfaces/IMessageChannelManager.cs` +- `CoreAgent.WebSocketTransport/Services/MessageChannelManager.cs` + +**修改内容**: + +1. **问题描述**: + - `StopChannelManager()` 方法直接调用 `_ChannelManager.Dispose()` 可能导致资源过早释放 + - `IMessageChannelManager` 接口已实现 `IDisposable`,生命周期应由DI容器管理 + - 直接调用 `Dispose()` 可能影响其他使用该实例的组件 + +2. **修复方案**: + - 修改 `MessageChannelManager` 设计,不在构造函数中创建通道 + - 移除构造函数中的默认参数,要求必须传入容量参数 + - 添加容量参数验证,确保参数有效性 + - 扩展 `WebSocketConfig` 配置类,添加分别的通道容量配置 + - 更新依赖注入注册,从配置中读取不同的通道容量 + - 优化通道管理方法,职责更加清晰: + - `CreateChannels()` - 安全创建(已存在则跳过) + - `ClearAllChannels()` - 清空通道消息(保持通道可用) + - `CompleteAllChannels()` - 完成通道(标记不再接受新消息,但保持可读) + - `ReleaseChannels()` - 完全释放通道资源 + + - 将 `StopChannelManager()` 改为调用 `ReleaseChannels()` + - 将 `RecreateChannelManager()` 改为调用 `CreateChannels()` + - **优化通道创建逻辑**: + - 修改 `CreateChannels()` 方法,如果通道已存在则先释放再创建 + - 移除 `ResetChannels()` 方法,避免功能重复 + - 简化接口设计,减少方法数量 + - **WebSocketTransport 集成通道管理**: + - 重构 `ConnectInternalAsync()` 方法,提取为多个独立方法: + - `CreateMessageChannels()` - 先创建消息通道(同步方法) + - `EstablishWebSocketConnectionAsync()` - 再建立WebSocket连接(异步方法) + - `StartBackgroundTasks()` - 最后启动后台任务(同步方法) + - 在 `CloseAsync()` 中调用 `_channelManager.ReleaseChannels()` 释放通道 + - 在重连失败时也释放通道,确保资源正确清理 + - 通道生命周期与WebSocket连接生命周期完全同步 + - **添加自动重连配置选项**: + - 在 `WebSocketConfig` 中添加 `EnableAutoReconnect` 配置项 + - 在 `TriggerReconnect()` 方法中添加配置检查 + - 支持通过配置文件控制是否启用自动重连功能 + - **修复WebSocket配置文件**: + - 更新 `websocket.json` 和 `websocket.Development.json` 配置文件 + - 添加所有新增的配置项:`EnableAutoReconnect`、`SendChannelCapacity`、`ReceiveChannelCapacity`、`PriorityChannelCapacity`、`MaxChunkSize`、`ChunkDelayMs` + - 为开发环境和生产环境提供不同的配置值 + - 添加重复调用检查,避免二次调用导致异常 + - 添加异常处理和日志记录 + - 保持方法功能的同时避免资源管理问题 + +3. **具体修改**: + **IMessageChannelManager 接口新增方法**: + ```csharp + /// + /// 创建所有通道 + /// + void CreateChannels(); + + /// + /// 释放所有通道 + /// + void ReleaseChannels(); + ``` + + **MessageChannelManager 实现**: + ```csharp + // 构造函数要求必须传入容量参数,并验证参数有效性 + public MessageChannelManager(ILogger logger, int sendChannelCapacity, int receiveChannelCapacity, int priorityChannelCapacity) + { + // 验证容量参数 + if (sendChannelCapacity <= 0) throw new ArgumentOutOfRangeException(...); + // 保存容量配置,不创建通道 + } + + // 安全创建通道(如果已存在则跳过) + public void CreateChannels() + { + // 检查通道是否已存在,如果存在则跳过创建 + } + + // 清空通道消息(保持通道可用) + public void ClearAllChannels() + { + // 清空所有通道中的消息,但保持通道结构 + } + + // 完成通道(标记不再接受新消息,但保持可读) + public void CompleteAllChannels() + { + // 标记通道完成,不再接受新消息,但可以继续读取 + } + + // 完全释放通道资源 + public void ReleaseChannels() + { + // 完成通道,释放资源,清空引用 + } + + + ``` + + **WebSocketConfig 配置扩展**: + ```csharp + public class WebSocketConfig + { + // 是否启用自动重连功能 + public bool EnableAutoReconnect { get; set; } = true; + + // 最大重连尝试次数 + public int MaxReconnectAttempts { get; set; } = 5; + + // 发送通道容量 + public int SendChannelCapacity { get; set; } = 1000; + + // 接收通道容量 + public int ReceiveChannelCapacity { get; set; } = 1000; + + // 优先级通道容量 + public int PriorityChannelCapacity { get; set; } = 100; + } + ``` + + **依赖注入注册更新**: + ```csharp + services.AddSingleton(provider => + { + var config = provider.GetRequiredService>().Value; + return new MessageChannelManager(logger, config.SendChannelCapacity, config.ReceiveChannelCapacity, config.PriorityChannelCapacity); + }); + ``` + + **NetworkProtocolLogObserver 修改**: + ```csharp + public void StopChannelManager() + { + // 调用 ReleaseChannels() 释放通道 + _ChannelManager.ReleaseChannels(); + } + + public void RecreateChannelManager() + { + // 调用 CreateChannels() 重新创建通道(会自动释放现有通道) + _ChannelManager.CreateChannels(); + } + ``` + + **WebSocketTransport 重构**: + ```csharp + // 连接时先创建通道,再建立连接 + private async Task ConnectInternalAsync(CancellationToken cancellationToken) + { + // 1. 先创建消息通道 + await CreateMessageChannelsAsync(); + + // 2. 再建立 WebSocket 连接 + await EstablishWebSocketConnectionAsync(cancellationToken); + + // 3. 最后启动后台任务 + await StartBackgroundTasksAsync(); + } + + // 提取的独立方法 + private void CreateMessageChannels() + { + _channelManager.CreateChannels(); + } + + private async Task EstablishWebSocketConnectionAsync(CancellationToken cancellationToken) + { + await _connection.ConnectAsync(...); + } + + private void StartBackgroundTasks() + { + // 启动发送、接收、心跳任务 + } + ``` + +4. **修复优势**: + - **配置灵活性**: 支持通过配置文件分别设置不同通道的容量,更加灵活 + - **参数验证**: 构造函数中验证容量参数,确保参数有效性 + - **方法职责清晰**: 每个方法职责明确,避免功能重叠 + - **生命周期控制**: 通道的创建和释放完全由用户控制,更加灵活 + - **资源管理**: 避免在构造函数中创建资源,符合延迟初始化原则 + - **重复使用**: 支持多次创建和释放,满足业务需求 + - **重复调用保护**: 防止二次调用导致异常,提高系统稳定性 + - **异常处理**: 添加了完整的异常处理和日志记录,但不影响主程序运行 + - **业务连续性**: 异常被捕获并记录,但不会中断主程序流程 + - **功能保持**: 仍然能够正确停止和重新创建通道管理器 + - **日志完善**: 提供了详细的调试和错误日志信息 + +### 创建蜂窝网络配置实体类 + ### 创建蜂窝网络配置实体类 **修改时间**: 2024年