Browse Source

feat: 优化NetworkProtocolLogObserver并发处理和ProtocolWsClientManager资源管理

- 修复NetworkProtocolLogObserver多客户端并发安全问题
  * 在OnProtocolLogsReceived方法中添加lock保护整个处理过程
  * 确保多个ProtocolWsClient实例可以安全并发访问
  * 简化设计,移除不必要的SemaphoreSlim和配置选项
  * 保留完整的日志跟踪功能

- 优化ProtocolWsClientManager资源管理
  * 在StopAllClients方法中先调用client.Stop()再调用client.Dispose()
  * 确保客户端优雅停止,避免资源泄漏
  * 简化Dispose方法,移除重复的客户端释放逻辑
  * 支持多次启动/停止循环,每次都能安全处理多客户端并发

- 技术改进
  * 使用单一lock对象保护共享状态,简化并发控制
  * 保持NetworkProtocolLogObserver单例模式,支持重复使用
  * 完善异常处理和日志记录
  * 提高代码可维护性和稳定性
feature/protocol-log-Perfect
root 4 months ago
parent
commit
b5ef9e9473
  1. 13
      CoreAgent.Infrastructure/Contexts/CellularNetworkContext.cs
  2. 3
      CoreAgent.Infrastructure/Services/Network/GeneralCellularNetworkService.cs
  3. 103
      CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs
  4. 24
      CoreAgent.Infrastructure/Services/Network/ProtocolWsClientManager.cs
  5. 11
      CoreAgent.ProtocolClient/ProtocolEngineCore/ProtocolLogProcessor.cs
  6. 2908
      modify.md

13
CoreAgent.Infrastructure/Contexts/CellularNetworkContext.cs

@ -76,17 +76,14 @@ public class CellularNetworkContext : ICellularNetworkContext, IDisposable
{
throw new ObjectDisposedException(nameof(CellularNetworkContext));
}
//if (_isInitialized)
//{
// return;
//}
if (string.IsNullOrEmpty(neConfigKey))
{
throw new ArgumentNullException(nameof(neConfigKey));
}
_logger.LogInformation("开始初始化蜂窝网络上下文,配置键: {ConfigKey}", neConfigKey);
lock (_lock)
{
_networkIPEndPointManager.Clear();
@ -95,6 +92,8 @@ public class CellularNetworkContext : ICellularNetworkContext, IDisposable
_currentConfigType = NetworkConfigType.None;
_isInitialized = true;
}
_logger.LogInformation("蜂窝网络上下文初始化完成,配置键: {ConfigKey}", neConfigKey);
}
@ -115,7 +114,7 @@ public class CellularNetworkContext : ICellularNetworkContext, IDisposable
lock (_lock)
{
_deviceCode = deviceCode;
_logger.LogInformation($"设备代码已设置为: {deviceCode}");
_logger.LogInformation("设备代码已设置为: {DeviceCode}", deviceCode);
}
}
@ -137,7 +136,9 @@ public class CellularNetworkContext : ICellularNetworkContext, IDisposable
lock (_lock)
{
var previousConfigType = _currentConfigType;
_currentConfigType = configType;
_logger.LogInformation("网络配置类型已更新: {PreviousConfigType} -> {NewConfigType}", previousConfigType, configType);
}
}

3
CoreAgent.Infrastructure/Services/Network/GeneralCellularNetworkService.cs

@ -96,10 +96,11 @@ namespace CoreAgent.Infrastructure.Services.Network
return stateCheckResult;
}
}
_logger.LogWarning($"初始化网络上下文 运行编码 {key}");
_logger.LogInformation("开始初始化网络上下文,运行编码: {RuntimeCode}", key);
// 4. 初始化网络上下文
_context.Initialize(key);
_logger.LogInformation("设置设备代码: {DeviceCode}", cellular.DeviceCode);
_context.SetDeviceCode(cellular.DeviceCode);
// 5. 启动网络

103
CoreAgent.Infrastructure/Services/Network/NetworkProtocolLogObserver.cs

@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using CoreAgent.Domain.Interfaces.Network;
using CoreAgent.ProtocolClient.Models;
@ -12,43 +13,84 @@ using Microsoft.Extensions.Logging;
namespace CoreAgent.Infrastructure.Services.Network
{
public class NetworkProtocolLogObserver : IProtocolLogObserver
public class NetworkProtocolLogObserver : IProtocolLogObserver, IDisposable
{
private readonly ILogger<NetworkProtocolLogObserver> _logger;
private readonly IMessageChannelManager _ChannelManager;
private readonly ICellularNetworkContext _context;
public NetworkProtocolLogObserver(ILogger<NetworkProtocolLogObserver> logger, IMessageChannelManager channelManager, ICellularNetworkContext context)
private readonly object _lock = new object();
private bool _disposed = false;
public NetworkProtocolLogObserver(
ILogger<NetworkProtocolLogObserver> logger,
IMessageChannelManager channelManager,
ICellularNetworkContext context)
{
this._logger = logger;
this._ChannelManager = channelManager;
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
this._ChannelManager = channelManager ?? throw new ArgumentNullException(nameof(channelManager));
this._context = context ?? throw new ArgumentNullException(nameof(context));
_logger.LogInformation("NetworkProtocolLogObserver初始化完成");
}
/// <summary>
/// 处理接收到的协议日志
/// </summary>
/// <param name="logDetails">协议日志详情</param>
public void OnProtocolLogsReceived(IEnumerable<TransferProtocolLog> logDetails)
{
var startTime = DateTime.UtcNow;
// 空值检查
if (_disposed)
{
_logger.LogWarning("NetworkProtocolLogObserver已释放,跳过日志处理");
return;
}
if (logDetails == null)
{
_logger.LogWarning("接收到的协议日志为空");
return;
}
// 转换为列表以避免多次枚举
var logList = logDetails.ToList();
var logCount = logList.Count;
// 空集合检查
if (logCount == 0)
{
_logger.LogDebug("接收到的协议日志集合为空,跳过处理");
return;
}
// 使用锁保护整个处理过程,确保线程安全
lock (_lock)
{
try
{
ProcessLogsInternal(logList, logCount);
}
catch (Exception ex)
{
_logger.LogError(ex, "协议日志处理过程中发生异常,数量: {LogCount}", logCount);
}
}
}
/// <summary>
/// 内部日志处理方法
/// </summary>
private void ProcessLogsInternal(List<TransferProtocolLog> logList, int logCount)
{
var startTime = DateTime.UtcNow;
_logger.LogDebug("开始处理协议日志,数量: {LogCount}", logCount);
try
{
_logger.LogDebug("开始处理协议日志,数量: {LogCount}", logCount);
string RuntimeCode = _context.GetNeConfigKey();
// 获取运行时配置键(外层已有锁保护)
string runtimeCode = _context.GetNeConfigKey();
// 将 CoreAgent.ProtocolClient 的 TransferProtocolLog 转换为 WebSocket 传输层的 MessageTransferProtocolLog
var webSocketLogs = logList.Select(log => new MessageTransferProtocolLog
{
@ -65,10 +107,11 @@ namespace CoreAgent.Infrastructure.Services.Network
Info = log.Info,
Message = log.Message,
DeviceCode = _context.DeviceCode,
RuntimeCode = RuntimeCode,
RuntimeCode = runtimeCode,
});
ProtocolMessage message = new ProtocolMessage(webSocketLogs.ToArray());
// 尝试写入通道并跟踪结果
var writeSuccess = _ChannelManager.SendChannel.TryWrite(message);
var processingTime = DateTime.UtcNow - startTime;
@ -76,12 +119,12 @@ namespace CoreAgent.Infrastructure.Services.Network
if (writeSuccess)
{
_logger.LogDebug("运行编码{RuntimeCode} 协议日志处理成功,数量: {LogCount}, 处理时间: {ProcessingTime}ms",
RuntimeCode, logCount, processingTime.TotalMilliseconds);
runtimeCode, logCount, processingTime.TotalMilliseconds);
}
else
{
_logger.LogWarning("运行编码{RuntimeCode} 协议日志写入通道失败,数量: {LogCount}, 处理时间: {ProcessingTime}ms, 通道可能已满或已关闭",
RuntimeCode, logCount, processingTime.TotalMilliseconds);
runtimeCode, logCount, processingTime.TotalMilliseconds);
}
}
catch (Exception ex)
@ -89,7 +132,35 @@ namespace CoreAgent.Infrastructure.Services.Network
var processingTime = DateTime.UtcNow - startTime;
_logger.LogError(ex, "协议日志处理异常,数量: {LogCount}, 处理时间: {ProcessingTime}ms",
logCount, processingTime.TotalMilliseconds);
throw; // 重新抛出异常,让上层处理
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 释放资源
/// </summary>
/// <param name="disposing">是否正在释放托管资源</param>
protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
_logger.LogInformation("NetworkProtocolLogObserver资源已释放");
}
_disposed = true;
}
}
}
}

24
CoreAgent.Infrastructure/Services/Network/ProtocolWsClientManager.cs

@ -212,6 +212,8 @@ namespace CoreAgent.Infrastructure.Services.Network
var client = kvp.Value;
var wasConnected = client.IsConnected;
// 先停止客户端,再释放资源
client.Stop();
client.Dispose();
stoppedCount++;
@ -287,27 +289,9 @@ namespace CoreAgent.Infrastructure.Services.Network
{
lock (_lock)
{
// StopAllClients() 已经处理了所有客户端的停止和释放
StopAllClients();
var disposedCount = 0;
var failedCount = 0;
foreach (var client in _clients.Values)
{
try
{
client.Dispose();
disposedCount++;
}
catch (Exception ex)
{
failedCount++;
_logger.LogError(ex, "释放客户端资源失败");
}
}
_clients.Clear();
_logger.LogInformation("协议客户端管理器释放完成 - 成功: {DisposedCount}, 失败: {FailedCount}", disposedCount, failedCount);
_logger.LogInformation("协议客户端管理器释放完成");
}
}

11
CoreAgent.ProtocolClient/ProtocolEngineCore/ProtocolLogProcessor.cs

@ -9,6 +9,7 @@ using CoreAgent.ProtocolClient.Context;
using CoreAgent.ProtocolClient.Managers.WebSocketMgr;
using CoreAgent.ProtocolClient.Models;
using Microsoft.Extensions.Logging;
using System.Threading;
namespace CoreAgent.ProtocolClient.ProtocolEngineCore
{
@ -145,6 +146,9 @@ namespace CoreAgent.ProtocolClient.ProtocolEngineCore
/// <param name="logDetails">日志详情列表</param>
private void ProcessLogDetails(List<TransferProtocolLog> logDetails)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
_logger.LogDebug("ProcessLogDetails 开始处理,线程ID: {ThreadId}, 日志数量: {LogCount}", threadId, logDetails.Count);
foreach (var detail in logDetails)
{
try
@ -163,6 +167,9 @@ namespace CoreAgent.ProtocolClient.ProtocolEngineCore
// 通知协议日志观察者处理转换后的数据
try
{
// 添加打印输出 - 通知观察者前的日志
_logger.LogInformation("开始通知协议日志观察者处理转换后的数据,线程ID: {ThreadId}, 日志详情数量: {LogDetailsCount}", threadId, logDetails.Count);
// 过滤掉Info为空或无效的记录
var filteredLogDetails = logDetails.Where(detail =>
!string.IsNullOrWhiteSpace(detail.Info)
@ -170,7 +177,9 @@ namespace CoreAgent.ProtocolClient.ProtocolEngineCore
if (filteredLogDetails.Any())
{
_logger.LogInformation("过滤后有效日志记录数量: {FilteredCount},准备通知观察者", filteredLogDetails.Count);
_protocolLogObserver.OnProtocolLogsReceived(filteredLogDetails);
_logger.LogInformation("成功通知协议日志观察者,已处理 {ProcessedCount} 条日志记录", filteredLogDetails.Count);
}
else
{
@ -181,6 +190,8 @@ namespace CoreAgent.ProtocolClient.ProtocolEngineCore
{
_logger.LogError(ex, "通知协议日志观察者失败");
}
_logger.LogDebug("ProcessLogDetails 处理完成,线程ID: {ThreadId}", threadId);
}
#region 公共接口 - 委托给消息处理器

2908
modify.md

File diff suppressed because it is too large
Loading…
Cancel
Save