Browse Source

修复WebSocket依赖注入顺序问题 - 将IProtocolLogObserver注册移到ProtocolMessageHandler之前,确保依赖关系正确解析

feature/x1-web-request
root 3 days ago
parent
commit
d5a29552ee
  1. 376
      src/X1.Application/BackendServiceManager/DeviceManagementService.cs
  2. 159
      src/X1.Application/BackendServiceManager/ProtocolChannelManager.cs
  3. 8
      src/X1.Application/DependencyInjection.cs
  4. 130
      src/X1.Application/Features/DeviceRuntimes/Commands/StartDeviceRuntime/StartDeviceRuntimeCommandHandler.cs
  5. 48
      src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQuery.cs
  6. 134
      src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQueryHandler.cs
  7. 112
      src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceResponse.cs
  8. 1
      src/X1.Application/X1.Application.csproj
  9. 55
      src/X1.Domain/Entities/Logging/ProtocolLayer.cs
  10. 159
      src/X1.Domain/Entities/Logging/ProtocolLog.cs
  11. 44
      src/X1.Domain/Models/DeviceBasicInfo.cs
  12. 14
      src/X1.Domain/Models/MessageTransferProtocolLog.cs
  13. 16
      src/X1.Domain/Repositories/Base/IQueryRepository.cs
  14. 8
      src/X1.Domain/Repositories/Device/ICellularDeviceRepository.cs
  15. 5
      src/X1.Domain/Repositories/Device/ICellularDeviceRuntimeRepository.cs
  16. 75
      src/X1.Domain/Repositories/Logging/IProtocolLogRepository.cs
  17. 37
      src/X1.Domain/Transmission/IProtocolChannelManager.cs
  18. 14
      src/X1.Domain/Transmission/IProtocolLogObserver.cs
  19. 1
      src/X1.Domain/X1.Domain.csproj
  20. 94
      src/X1.Infrastructure/Configurations/Logging/ProtocolLogConfiguration.cs
  21. 5
      src/X1.Infrastructure/Context/AppDbContext.cs
  22. 4
      src/X1.Infrastructure/DependencyInjection.cs
  23. 11
      src/X1.Infrastructure/Repositories/CQRS/QueryRepository.cs
  24. 20
      src/X1.Infrastructure/Repositories/Device/CellularDeviceRepository.cs
  25. 9
      src/X1.Infrastructure/Repositories/Device/CellularDeviceRuntimeRepository.cs
  26. 226
      src/X1.Infrastructure/Repositories/Logging/ProtocolLogRepository.cs
  27. 6
      src/X1.Infrastructure/X1.Infrastructure.csproj
  28. 78
      src/X1.Presentation/Controllers/ProtocolLogsController.cs
  29. 7
      src/X1.WebSocket/Extensions/ServiceCollectionExtensions.cs
  30. 50
      src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs
  31. 1
      src/X1.WebSocket/Models/ProtocolMessage.cs
  32. 152
      src/X1.WebSocket/Transmission/NetworkProtocolLogObserver.cs
  33. 4
      src/X1.WebSocket/X1.WebSocket.csproj

376
src/X1.Application/BackendServiceManager/DeviceManagementService.cs

@ -0,0 +1,376 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using CellularManagement.Domain.Repositories.Device;
using CellularManagement.Domain.Repositories.Logging;
using CellularManagement.Domain.Entities.Logging;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using X1.DynamicClientCore.Interfaces;
using X1.DynamicClientCore.Models;
using CellularManagement.Domain.Models;
using X1.Domain.Transmission;
using CellularManagement.Domain.Repositories.Base;
using System.Data;
namespace X1.Application.BackendServiceManager
{
/// <summary>
/// 设备管理服务 - 负责管理设备端点的后台服务
/// </summary>
public class DeviceManagementService : BackgroundService
{
private readonly ILogger<DeviceManagementService> _logger;
private readonly IServiceEndpointManager _endpointManager;
private readonly ICellularDeviceRepository _deviceRepository;
private readonly IProtocolChannelManager _protocolChannelManager;
private readonly IProtocolLogRepository _repository;
private readonly IUnitOfWork _unitOfWork;
// 配置常量
private const string DEFAULT_PROTOCOL = "http";
private const string DEFAULT_BASE_PATH = "/api/v1";
private const int DEFAULT_TIMEOUT = 10;
public DeviceManagementService(
ICellularDeviceRepository deviceRepository,
IServiceEndpointManager endpointManager,
IProtocolChannelManager protocolChannelManager,
ILogger<DeviceManagementService> logger,
IProtocolLogRepository repository,
IUnitOfWork unitOfWork)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_endpointManager = endpointManager ?? throw new ArgumentNullException(nameof(endpointManager));
_deviceRepository = deviceRepository ?? throw new ArgumentNullException(nameof(deviceRepository));
_protocolChannelManager = protocolChannelManager ?? throw new ArgumentNullException(nameof(protocolChannelManager));
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_unitOfWork = unitOfWork ?? throw new ArgumentNullException(nameof(unitOfWork));
}
/// <summary>
/// 执行后台服务的主要逻辑
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("DeviceManagementService started. Initializing device endpoints...");
try
{
await InitializeDeviceEndpointsAsync(stoppingToken);
_logger.LogInformation("DeviceManagementService completed initialization.");
}
catch (OperationCanceledException)
{
_logger.LogInformation("DeviceManagementService was cancelled during initialization.");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred during device endpoint initialization.");
}
// 启动协议日志处理循环
_ = Task.Run(() => ProcessProtocolLogsAsync(stoppingToken), stoppingToken);
// 服务初始化完成后,等待取消请求
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken); // 每秒检查一次取消请求
}
_logger.LogInformation("DeviceManagementService stopped.");
}
/// <summary>
/// 处理协议日志的循环
/// </summary>
private async Task ProcessProtocolLogsAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("开始协议日志处理循环");
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 从通道读取协议日志
var protocolLogs = await _protocolChannelManager.ReadFromChannelAsync(stoppingToken);
if (protocolLogs.Any())
{
await ProcessProtocolLogs(protocolLogs, stoppingToken);
}
else
{
// 没有日志时短暂等待,避免空转
await Task.Delay(100, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("协议日志处理循环被取消");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理协议日志时发生错误");
await Task.Delay(1000, stoppingToken); // 错误时等待1秒再继续
}
}
_logger.LogInformation("协议日志处理循环已停止");
}
/// <summary>
/// 处理协议日志
/// </summary>
private async Task ProcessProtocolLogs(IEnumerable<ProtocolLog> protocolLogs, CancellationToken cancellationToken)
{
// 参数验证
if (protocolLogs == null)
{
_logger.LogWarning("接收到空的协议日志集合");
return;
}
var logs = protocolLogs.ToList();
if (!logs.Any())
{
_logger.LogDebug("协议日志集合为空,跳过处理");
return;
}
_logger.LogDebug("开始处理协议日志,数量:{Count}", logs.Count);
// 数据验证
var validLogs = ValidateAndFilterLogs(logs);
if (!validLogs.Any())
{
_logger.LogWarning("没有有效的协议日志需要处理");
return;
}
// 使用事务处理
using var transaction = await _unitOfWork.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try
{
// 批量插入到数据库
await _repository.AddRangeAsync(validLogs, cancellationToken);
// 保存更改
await _unitOfWork.SaveChangesAsync(cancellationToken);
// 提交事务
await _unitOfWork.CommitTransactionAsync(transaction, cancellationToken);
_logger.LogDebug("协议日志批量插入数据库成功,数量:{Count}", validLogs.Count());
}
catch (OperationCanceledException)
{
_logger.LogInformation("协议日志处理被取消");
await _unitOfWork.RollbackTransactionAsync(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "批量插入协议日志到数据库失败,数量:{Count}", validLogs.Count());
await _unitOfWork.RollbackTransactionAsync(cancellationToken);
// 如果批量插入失败,尝试逐个插入
await ProcessProtocolLogsIndividually(validLogs, cancellationToken);
}
}
/// <summary>
/// 验证和过滤协议日志
/// </summary>
private IEnumerable<ProtocolLog> ValidateAndFilterLogs(IEnumerable<ProtocolLog> logs)
{
return logs.Where(log =>
{
if (log == null)
{
_logger.LogDebug("跳过null协议日志");
return false;
}
// 验证必需字段
if (string.IsNullOrWhiteSpace(log.DeviceCode))
{
_logger.LogDebug("跳过无效日志:设备代码为空,ID:{Id}", log.Id);
return false;
}
if (string.IsNullOrWhiteSpace(log.RuntimeCode))
{
_logger.LogDebug("跳过无效日志:运行时代码为空,ID:{Id}", log.Id);
return false;
}
if (log.Timestamp <= 0)
{
_logger.LogDebug("跳过无效日志:时间戳无效,ID:{Id}", log.Id);
return false;
}
if (log.MessageId <= 0)
{
_logger.LogDebug("跳过无效日志:消息ID无效,ID:{Id}", log.Id);
return false;
}
return true;
});
}
/// <summary>
/// 逐个处理协议日志(批量插入失败时的备用方案)
/// </summary>
private async Task ProcessProtocolLogsIndividually(IEnumerable<ProtocolLog> protocolLogs, CancellationToken cancellationToken)
{
var logs = protocolLogs.ToList();
var successCount = 0;
var errorCount = 0;
_logger.LogInformation("开始逐个插入协议日志,总数:{Count}", logs.Count);
// 分批处理,避免内存问题
const int batchSize = 50;
for (int i = 0; i < logs.Count; i += batchSize)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}
var batch = logs.Skip(i).Take(batchSize);
using var transaction = await _unitOfWork.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try
{
foreach (var log in batch)
{
try
{
await _repository.AddAsync(log, cancellationToken);
successCount++;
}
catch (Exception ex)
{
errorCount++;
_logger.LogError(ex, "插入单个协议日志失败,ID:{LogId},设备:{DeviceCode}", log.Id, log.DeviceCode);
}
}
// 保存当前批次的更改
await _unitOfWork.SaveChangesAsync(cancellationToken);
await _unitOfWork.CommitTransactionAsync(transaction, cancellationToken);
_logger.LogDebug("批次处理完成,成功:{SuccessCount},失败:{ErrorCount},批次大小:{BatchSize}",
successCount, errorCount, batch.Count());
}
catch (Exception ex)
{
await _unitOfWork.RollbackTransactionAsync(cancellationToken);
_logger.LogError(ex, "批次处理失败,批次索引:{BatchIndex}", i / batchSize);
errorCount += batch.Count();
}
}
_logger.LogInformation("协议日志逐个插入完成,成功:{SuccessCount},失败:{ErrorCount},总数:{TotalCount}",
successCount, errorCount, logs.Count);
}
/// <summary>
/// 处理单个协议日志
/// </summary>
private async Task ProcessSingleProtocolLog(ProtocolLog log, CancellationToken cancellationToken)
{
// 这里可以添加具体的协议日志处理逻辑
// 例如:保存到数据库、发送通知、更新设备状态等
_logger.LogDebug("处理协议日志,ID:{Id},设备:{DeviceCode},运行时:{RuntimeCode},层类型:{LayerType}",
log.Id, log.DeviceCode, log.RuntimeCode, log.LayerType);
// 示例:根据设备代码查找对应的端点进行处理
var endpoint = _endpointManager.GetEndpoint(log.DeviceCode);
if (endpoint != null)
{
// 可以在这里调用设备端点的API进行相关操作
_logger.LogDebug("找到设备端点:{EndpointName},IP:{Ip},端口:{Port}",
endpoint.Name, endpoint.Ip, endpoint.Port);
}
await Task.CompletedTask; // 占位符,实际处理逻辑待实现
}
/// <summary>
/// 初始化设备端点信息
/// </summary>
private async Task InitializeDeviceEndpointsAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("Initializing device endpoints...");
var devices = await _deviceRepository.GetDeviceBasicInfoListAsync(cancellationToken);
if (devices == null || !devices.Any())
{
_logger.LogWarning("No devices found to initialize endpoints.");
return;
}
var successCount = 0;
var skipCount = 0;
foreach (var device in devices)
{
if (IsValidDeviceInfo(device))
{
var endpoint = CreateServiceEndpoint(device);
_endpointManager.AddOrUpdateEndpoint(endpoint);
successCount++;
_logger.LogDebug("Initialized endpoint for device: {DeviceCode} at {IpAddress}:{Port}",
device.DeviceCode, device.IpAddress, device.AgentPort);
}
else
{
_logger.LogWarning("Skipping invalid device: {DeviceCode}", device.DeviceCode);
skipCount++;
}
}
_logger.LogInformation("Device endpoint initialization completed. Success: {SuccessCount}, Skipped: {SkipCount}, Total: {TotalCount}",
successCount, skipCount, devices.Count);
}
/// <summary>
/// 验证设备信息是否有效
/// </summary>
private static bool IsValidDeviceInfo(DeviceBasicInfo device)
{
return device != null
&& !string.IsNullOrWhiteSpace(device.DeviceCode)
&& !string.IsNullOrWhiteSpace(device.IpAddress)
&& device.AgentPort > 0
&& device.AgentPort <= 65535;
}
/// <summary>
/// 根据设备信息创建服务端点
/// </summary>
private static ServiceEndpoint CreateServiceEndpoint(DeviceBasicInfo device)
{
return new ServiceEndpoint
{
Name = device.DeviceCode,
Ip = device.IpAddress,
Port = device.AgentPort,
Protocol = DEFAULT_PROTOCOL,
BasePath = DEFAULT_BASE_PATH,
Timeout = DEFAULT_TIMEOUT,
Enabled = true
};
}
}
}

159
src/X1.Application/BackendServiceManager/ProtocolChannelManager.cs

@ -0,0 +1,159 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using CellularManagement.Domain.Entities.Logging;
using Microsoft.Extensions.Logging;
using X1.Domain.Transmission;
namespace X1.Application.BackendServiceManager
{
/// <summary>
/// 协议通道管理器实现
/// 提供协议日志的读取、写入和清空功能
/// </summary>
public class ProtocolChannelManager : IProtocolChannelManager
{
private readonly ILogger<ProtocolChannelManager> _logger;
private readonly Channel<ProtocolLog[]> _channel;
public ProtocolChannelManager(ILogger<ProtocolChannelManager> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_channel = Channel.CreateUnbounded<ProtocolLog[]>(new UnboundedChannelOptions
{
SingleReader = false,
SingleWriter = false,
AllowSynchronousContinuations = false
});
}
/// <summary>
/// 从通道读取协议日志
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志集合</returns>
public async Task<IEnumerable<ProtocolLog>> ReadFromChannelAsync(CancellationToken cancellationToken = default)
{
try
{
// 等待有数据可读
if (await _channel.Reader.WaitToReadAsync(cancellationToken))
{
// 读取一个数组
if (_channel.Reader.TryRead(out var logsArray))
{
_logger.LogDebug("从通道读取协议日志数组,数量:{Count}", logsArray.Length);
return logsArray;
}
}
return Enumerable.Empty<ProtocolLog>();
}
catch (OperationCanceledException)
{
_logger.LogInformation("读取协议日志操作被取消");
return Enumerable.Empty<ProtocolLog>();
}
catch (Exception ex)
{
_logger.LogError(ex, "读取协议日志时发生错误");
return Enumerable.Empty<ProtocolLog>();
}
}
/// <summary>
/// 向通道写入协议日志
/// </summary>
/// <param name="protocolLogs">协议日志集合</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>写入结果</returns>
public async Task<bool> WriteToChannelAsync(IEnumerable<ProtocolLog> protocolLogs, CancellationToken cancellationToken = default)
{
if (protocolLogs == null)
{
_logger.LogWarning("尝试写入空的协议日志集合");
return false;
}
try
{
var logs = protocolLogs.ToList();
if (!logs.Any())
{
_logger.LogWarning("协议日志集合为空,跳过写入");
return false;
}
// 过滤掉null的日志
var validLogs = logs.Where(log => log != null).ToArray();
if (validLogs.Length == 0)
{
_logger.LogWarning("没有有效的协议日志,跳过写入");
return false;
}
// 批量写入数组
await _channel.Writer.WriteAsync(validLogs, cancellationToken);
_logger.LogDebug("写入协议日志数组到通道,数量:{Count}", validLogs.Length);
return true;
}
catch (OperationCanceledException)
{
_logger.LogInformation("写入协议日志操作被取消");
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "写入协议日志时发生错误");
return false;
}
}
/// <summary>
/// 清空协议通道
/// </summary>
/// <returns>清空结果</returns>
public async Task<bool> ClearChannelAsync()
{
try
{
var count = 0;
// 读取并丢弃所有数组直到通道为空
while (await _channel.Reader.WaitToReadAsync())
{
if (_channel.Reader.TryRead(out var logsArray))
{
count += logsArray.Length;
}
else
{
break;
}
}
_logger.LogInformation("清空协议通道,清空日志数量:{Count}", count);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "清空协议通道时发生错误");
return false;
}
}
/// <summary>
/// 获取当前通道中的日志数量(估算)
/// </summary>
/// <returns>日志数量</returns>
public int GetChannelCount()
{
return _channel.Reader.Count;
}
}
}

8
src/X1.Application/DependencyInjection.cs

@ -5,6 +5,8 @@ using CellularManagement.Application.Behaviours;
using CellularManagement.Domain.Services;
using Microsoft.Extensions.Configuration;
using CellularManagement.Domain.Options;
using X1.Application.BackendServiceManager;
using X1.Domain.Transmission;
namespace CellularManagement.Application;
@ -42,6 +44,12 @@ public static class DependencyInjection
// 注册验证器
services.AddScoped(typeof(IPipelineBehavior<,>), typeof(ValidationBehaviour<,>));
// 注册协议通道管理器(单例,因为需要在整个应用生命周期中保持状态)
services.AddSingleton<IProtocolChannelManager, ProtocolChannelManager>();
// 注册后台服务(依赖于上面的服务)
services.AddHostedService<DeviceManagementService>();
return services;
}
}

130
src/X1.Application/Features/DeviceRuntimes/Commands/StartDeviceRuntime/StartDeviceRuntimeCommandHandler.cs

@ -9,6 +9,7 @@ using X1.DynamicClientCore.Features;
using CellularManagement.Domain.Repositories.NetworkProfile;
using X1.DynamicClientCore.Models;
using X1.Domain.Models;
using System.Collections.Concurrent;
namespace CellularManagement.Application.Features.DeviceRuntimes.Commands.StartDeviceRuntime;
@ -110,42 +111,7 @@ public class StartDeviceRuntimeCommandHandler : IRequestHandler<StartDeviceRunti
}
// 并行启动网络并收集结果
var networkResults = new List<(bool Success, string DeviceCode, string ErrorMessage)>();
_logger.LogInformation("开始并行启动网络,请求数量: {RequestCount}", networkRequests.Count);
await Parallel.ForEachAsync(networkRequests, async (networkRequest, cts) =>
{
try
{
_logger.LogDebug("启动网络,设备代码: {DeviceCode}, 运行时代码: {RuntimeCode}",
networkRequest.DeviceCode, networkRequest.RuntimeCode);
var startResult = await _protocolClient.StartNetworkAsync(networkRequest);
_logger.LogDebug("网络启动结果,设备代码: {DeviceCode}, 启动成功: {StartResult}",
networkRequest.DeviceCode, startResult);
if (startResult)
{
_logger.LogDebug("网络启动成功,设备代码: {DeviceCode}", networkRequest.DeviceCode);
networkResults.Add((true, networkRequest.DeviceCode, string.Empty));
}
else
{
var errorMessage = "网络启动返回失败状态";
_logger.LogWarning("网络启动返回失败状态,设备代码: {DeviceCode}", networkRequest.DeviceCode);
networkResults.Add((false, networkRequest.DeviceCode, errorMessage));
}
}
catch (Exception ex)
{
var errorMessage = $"网络启动失败: {ex.Message}";
_logger.LogError(ex, "网络启动失败,设备代码: {DeviceCode}", networkRequest.DeviceCode);
networkResults.Add((false, networkRequest.DeviceCode, errorMessage));
}
});
// 检查网络启动结果
var successfulDevices = networkResults.Where(r => r.Success).Select(r => r.DeviceCode).ToHashSet();
var failedDevices = networkResults.Where(r => !r.Success).ToList();
var (successfulDevices, failedDevices) = await StartNetworksInParallelAsync(networkRequests, cancellationToken);
if (failedDevices.Any())
{
@ -393,8 +359,90 @@ public class StartDeviceRuntimeCommandHandler : IRequestHandler<StartDeviceRunti
_logger.LogDebug("构建网络配置请求,设备代码: {DeviceCode}, 网络堆栈代码: {NetworkStackCode}, 绑定关系数量: {BindingCount}",
deviceRequest.DeviceCode, deviceRequest.NetworkStackCode, coreNetworkImsConfigurations.Count);
return configuration;
})
.ToList();
}
}
return configuration;
})
.ToList();
}
/// <summary>
/// 并行启动网络并收集结果
/// </summary>
/// <param name="networkRequests">网络配置请求列表</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>成功和失败的设备信息元组</returns>
/// <remarks>
/// 该方法使用并行执行来同时启动多个网络,提高性能:
/// 1. 使用ConcurrentBag确保线程安全的结果收集
/// 2. 使用SemaphoreSlim控制并发数量,避免资源竞争(可选)
/// 3. 设置超时机制,防止任务无限等待
/// 4. 提供详细的错误处理和日志记录
/// 5. 确保资源正确释放
/// </remarks>
private async Task<(HashSet<string> SuccessfulDevices, List<(string DeviceCode, string ErrorMessage)> FailedDevices)>
StartNetworksInParallelAsync(List<CellularNetworkConfiguration> networkRequests, CancellationToken cancellationToken)
{
var networkResults = new ConcurrentBag<(bool Success, string DeviceCode, string ErrorMessage)>();
_logger.LogInformation("开始并行启动网络,请求数量: {RequestCount}", networkRequests.Count);
// 设置超时时间,防止任务无限等待
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); // 5分钟超时
using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
// 完全并行执行,不受信号量限制
var tasks = networkRequests.Select(async networkRequest =>
{
try
{
_logger.LogDebug("启动网络,设备代码: {DeviceCode}, 运行时代码: {RuntimeCode}",
networkRequest.DeviceCode, networkRequest.RuntimeCode);
var startResult = await _protocolClient.StartNetworkAsync(networkRequest);
_logger.LogDebug("网络启动结果,设备代码: {DeviceCode}, 启动成功: {StartResult}",
networkRequest.DeviceCode, startResult);
if (startResult)
{
_logger.LogDebug("网络启动成功,设备代码: {DeviceCode}", networkRequest.DeviceCode);
networkResults.Add((true, networkRequest.DeviceCode, string.Empty));
}
else
{
var errorMessage = "网络启动返回失败状态";
_logger.LogWarning("网络启动返回失败状态,设备代码: {DeviceCode}", networkRequest.DeviceCode);
networkResults.Add((false, networkRequest.DeviceCode, errorMessage));
}
}
catch (OperationCanceledException) when (timeoutCts.Token.IsCancellationRequested)
{
var errorMessage = "网络启动超时";
_logger.LogWarning("网络启动超时,设备代码: {DeviceCode}", networkRequest.DeviceCode);
networkResults.Add((false, networkRequest.DeviceCode, errorMessage));
}
catch (Exception ex)
{
var errorMessage = $"网络启动失败: {ex.Message}";
_logger.LogError(ex, "网络启动失败,设备代码: {DeviceCode}", networkRequest.DeviceCode);
networkResults.Add((false, networkRequest.DeviceCode, errorMessage));
}
});
try
{
// 等待所有任务完成
await Task.WhenAll(tasks);
}
catch (OperationCanceledException) when (timeoutCts.Token.IsCancellationRequested)
{
_logger.LogWarning("网络启动操作超时,部分设备可能未完成启动");
}
// 检查网络启动结果
var successfulDevices = networkResults.Where(r => r.Success).Select(r => r.DeviceCode).ToHashSet();
var failedDevices = networkResults.Where(r => !r.Success).Select(r => (r.DeviceCode, r.ErrorMessage)).ToList();
_logger.LogInformation("网络启动完成,成功设备数: {SuccessCount}, 失败设备数: {FailureCount}",
successfulDevices.Count, failedDevices.Count);
return (successfulDevices, failedDevices);
}
}

48
src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQuery.cs

@ -0,0 +1,48 @@
using CellularManagement.Domain.Common;
using CellularManagement.Domain.Entities.Device;
using MediatR;
using System.ComponentModel.DataAnnotations;
namespace CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice;
/// <summary>
/// 根据设备代码获取协议日志查询
/// </summary>
public class GetProtocolLogsByDeviceQuery : IRequest<OperationResult<GetProtocolLogsByDeviceResponse>>
{
/// <summary>
/// 设备代码
/// </summary>
[Required(ErrorMessage = "设备代码不能为空")]
[MaxLength(50, ErrorMessage = "设备代码不能超过50个字符")]
public string DeviceCode { get; set; } = string.Empty;
/// <summary>
/// 开始时间戳
/// </summary>
public long? StartTimestamp { get; set; }
/// <summary>
/// 结束时间戳
/// </summary>
public long? EndTimestamp { get; set; }
/// <summary>
/// 协议层类型
/// </summary>
[MaxLength(50, ErrorMessage = "协议层类型不能超过50个字符")]
public string? LayerType { get; set; }
/// <summary>
/// 设备运行时状态
/// 当设置为 Running 时,只获取当前运行的数据
/// </summary>
public DeviceRuntimeStatus? DeviceRuntimeStatus { get; set; }
/// <summary>
/// 是否按时间戳降序排序
/// </summary>
public bool OrderByDescending { get; set; } = true;
}

134
src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQueryHandler.cs

@ -0,0 +1,134 @@
using MediatR;
using Microsoft.Extensions.Logging;
using CellularManagement.Domain.Common;
using CellularManagement.Domain.Entities.Logging;
using CellularManagement.Domain.Entities.Device;
using CellularManagement.Domain.Repositories.Logging;
using CellularManagement.Domain.Repositories.Device;
using System.ComponentModel.DataAnnotations;
using System.Linq;
namespace CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice;
/// <summary>
/// 根据设备代码获取协议日志查询处理器
/// </summary>
public class GetProtocolLogsByDeviceQueryHandler : IRequestHandler<GetProtocolLogsByDeviceQuery, OperationResult<GetProtocolLogsByDeviceResponse>>
{
private readonly IProtocolLogRepository _protocolLogRepository;
private readonly ICellularDeviceRuntimeRepository _deviceRuntimeRepository;
private readonly ILogger<GetProtocolLogsByDeviceQueryHandler> _logger;
/// <summary>
/// 初始化查询处理器
/// </summary>
public GetProtocolLogsByDeviceQueryHandler(
IProtocolLogRepository protocolLogRepository,
ICellularDeviceRuntimeRepository deviceRuntimeRepository,
ILogger<GetProtocolLogsByDeviceQueryHandler> logger)
{
_protocolLogRepository = protocolLogRepository;
_deviceRuntimeRepository = deviceRuntimeRepository;
_logger = logger;
}
/// <summary>
/// 处理根据设备代码获取协议日志查询
/// </summary>
public async Task<OperationResult<GetProtocolLogsByDeviceResponse>> Handle(GetProtocolLogsByDeviceQuery request, CancellationToken cancellationToken)
{
try
{
// 验证请求参数
var validationContext = new ValidationContext(request);
var validationResults = new List<ValidationResult>();
if (!Validator.TryValidateObject(request, validationContext, validationResults, true))
{
var errorMessages = validationResults.Select(r => r.ErrorMessage).ToList();
_logger.LogWarning("请求参数无效: {Errors}", string.Join(", ", errorMessages));
return OperationResult<GetProtocolLogsByDeviceResponse>.CreateFailure(errorMessages);
}
_logger.LogInformation("开始获取设备 {DeviceCode} 的协议日志,运行时状态: {DeviceRuntimeStatus}",
request.DeviceCode, request.DeviceRuntimeStatus);
// 获取设备运行时状态(仅在需要时查询)
IEnumerable<string>? runtimeCodes = null;
if (request.DeviceRuntimeStatus.HasValue)
{
var deviceRuntimes = await _deviceRuntimeRepository.GetRuntimesByDeviceCodeAsync(
request.DeviceCode,
cancellationToken);
if (!deviceRuntimes.Any())
{
_logger.LogWarning("设备 {DeviceCode} 的运行时状态不存在", request.DeviceCode);
return OperationResult<GetProtocolLogsByDeviceResponse>.CreateFailure($"设备 {request.DeviceCode} 的运行时状态不存在");
}
// 过滤出匹配状态的运行时
var matchingRuntimes = deviceRuntimes.Where(r => r.RuntimeStatus == request.DeviceRuntimeStatus.Value).ToList();
if (!matchingRuntimes.Any())
{
_logger.LogWarning("设备 {DeviceCode} 的运行时状态不匹配,期望: {ExpectedStatus}",
request.DeviceCode, request.DeviceRuntimeStatus.Value);
return OperationResult<GetProtocolLogsByDeviceResponse>.CreateFailure(
$"设备 {request.DeviceCode} 的运行时状态不匹配,期望: {request.DeviceRuntimeStatus.Value}");
}
// 获取运行时编码集合
runtimeCodes = matchingRuntimes.Select(r => r.RuntimeCode).ToList();
_logger.LogInformation("使用运行时编码集合 {RuntimeCodes} 过滤协议日志", string.Join(", ", runtimeCodes));
}
// 使用 JOIN 高性能查询方法
var protocolLogs = await _protocolLogRepository.GetByDeviceWithFiltersAsync(
request.DeviceCode,
runtimeCodes,
request.StartTimestamp,
request.EndTimestamp,
request.LayerType,
request.OrderByDescending,
cancellationToken);
// 转换为DTO
var protocolLogDtos = protocolLogs.Select(log => new ProtocolLogDto
{
Id = log.Id,
MessageId = log.MessageId,
LayerType = (int)log.LayerType,
MessageDetailJson = log.MessageDetailJson,
CellID = log.CellID,
IMSI = log.IMSI,
Direction = log.Direction,
UEID = log.UEID,
PLMN = log.PLMN,
TimeMs = log.TimeMs,
Timestamp = log.Timestamp,
Info = log.Info,
Message = log.Message,
DeviceCode = log.DeviceCode,
RuntimeCode = log.RuntimeCode,
MessageDetail = log.MessageDetail,
Time = log.Time
}).ToList();
// 构建响应
var response = new GetProtocolLogsByDeviceResponse
{
DeviceCode = request.DeviceCode,
Items = protocolLogDtos.ToList()
};
_logger.LogInformation("成功获取设备 {DeviceCode} 的协议日志,共 {Count} 条记录",
request.DeviceCode, protocolLogDtos.Count);
return OperationResult<GetProtocolLogsByDeviceResponse>.CreateSuccess(response);
}
catch (Exception ex)
{
_logger.LogError(ex, "获取设备 {DeviceCode} 的协议日志时发生错误", request.DeviceCode);
return OperationResult<GetProtocolLogsByDeviceResponse>.CreateFailure($"获取协议日志时发生错误: {ex.Message}");
}
}
}

112
src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceResponse.cs

@ -0,0 +1,112 @@
using System.Collections.Generic;
namespace CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice;
/// <summary>
/// 根据设备代码获取协议日志响应
/// </summary>
public class GetProtocolLogsByDeviceResponse
{
/// <summary>
/// 设备代码
/// </summary>
public string DeviceCode { get; set; } = string.Empty;
/// <summary>
/// 协议日志列表
/// </summary>
public List<ProtocolLogDto> Items { get; set; } = new();
}
/// <summary>
/// 协议日志数据传输对象
/// </summary>
public class ProtocolLogDto
{
/// <summary>
/// 主键ID
/// </summary>
public string Id { get; set; } = string.Empty;
/// <summary>
/// 消息ID
/// </summary>
public long MessageId { get; set; }
/// <summary>
/// 协议层类型
/// </summary>
public int LayerType { get; set; }
/// <summary>
/// 消息详情集合(JSON格式存储)
/// </summary>
public string? MessageDetailJson { get; set; }
/// <summary>
/// 小区ID
/// </summary>
public int? CellID { get; set; }
/// <summary>
/// 国际移动用户识别码
/// </summary>
public string? IMSI { get; set; }
/// <summary>
/// 日志方向类型
/// </summary>
public int Direction { get; set; }
/// <summary>
/// 用户设备ID
/// </summary>
public int? UEID { get; set; }
/// <summary>
/// 公共陆地移动网络标识
/// </summary>
public string? PLMN { get; set; }
/// <summary>
/// 时间间隔(毫秒)
/// </summary>
public long TimeMs { get; set; }
/// <summary>
/// 时间戳
/// </summary>
public long Timestamp { get; set; }
/// <summary>
/// 信息字段
/// </summary>
public string? Info { get; set; }
/// <summary>
/// 消息字段
/// </summary>
public string? Message { get; set; }
/// <summary>
/// 设备代码
/// </summary>
public string DeviceCode { get; set; } = string.Empty;
/// <summary>
/// 运行时代码
/// </summary>
public string RuntimeCode { get; set; } = string.Empty;
/// <summary>
/// 消息详情集合(用于业务逻辑)
/// </summary>
public IEnumerable<string>? MessageDetail { get; set; }
/// <summary>
/// 时间间隔(用于业务逻辑)
/// </summary>
public TimeSpan Time { get; set; }
}

1
src/X1.Application/X1.Application.csproj

@ -12,6 +12,7 @@
<PackageReference Include="MediatR" Version="12.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Swashbuckle.AspNetCore.Annotations" Version="6.5.0" />
<PackageReference Include="Swashbuckle.AspNetCore.Filters" Version="7.0.8" />
<PackageReference Include="UAParser" Version="3.0.0" />

55
src/X1.Domain/Entities/Logging/ProtocolLayer.cs

@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace X1.Domain.Entities.Logging
{
/// <summary>
/// 协议层类型枚举
/// 定义了各种协议层的类型标识
/// </summary>
public enum ProtocolLayer
{
NONE,
GTPU,
LPPa,
M2AP,
MAC,
NAS,
NGAP,
NRPPa,
PDCP,
PROD,
PHY,
RLC,
RRC,
S1AP,
TRX,
X2AP,
XnAP,
IP,
IMS,
CX,
RX,
S6,
S13,
SGsAP,
SBcAP,
LCSAP,
N12,
N8,
N17,
N50,
N13,
NL1,
HTTP2,
EPDG,
IKEV2,
IPSEC,
MEDIA,
MMS,
SIP,
}
}

159
src/X1.Domain/Entities/Logging/ProtocolLog.cs

@ -0,0 +1,159 @@
using System.ComponentModel.DataAnnotations;
using CellularManagement.Domain.Abstractions;
using CellularManagement.Domain.Entities.Common;
using X1.Domain.Entities.Logging;
namespace CellularManagement.Domain.Entities.Logging;
/// <summary>
/// 协议日志实体
/// 用于存储协议层的日志数据
/// </summary>
public class ProtocolLog : Entity
{
/// <summary>
/// 消息ID
/// </summary>
[Required]
public long MessageId { get; private set; }
/// <summary>
/// 协议层类型
/// </summary>
[Required]
[MaxLength(50)]
public ProtocolLayer LayerType { get; private set; }
/// <summary>
/// 消息详情集合(JSON格式存储)
/// </summary>
public string? MessageDetailJson { get; private set; }
/// <summary>
/// 小区ID
/// </summary>
public int? CellID { get; private set; }
/// <summary>
/// 国际移动用户识别码
/// </summary>
[MaxLength(50)]
public string? IMSI { get; private set; }
/// <summary>
/// 日志方向类型
/// </summary>
[Required]
public int Direction { get; private set; }
/// <summary>
/// 用户设备ID
/// </summary>
public int? UEID { get; private set; }
/// <summary>
/// 公共陆地移动网络标识
/// </summary>
[MaxLength(20)]
public string? PLMN { get; private set; }
/// <summary>
/// 时间间隔(毫秒)
/// </summary>
[Required]
public long TimeMs { get; private set; }
/// <summary>
/// 时间戳
/// </summary>
[Required]
public long Timestamp { get; private set; }
/// <summary>
/// 信息字段
/// </summary>
[MaxLength(500)]
public string? Info { get; private set; }
/// <summary>
/// 消息字段
/// </summary>
[MaxLength(1000)]
public string? Message { get; private set; }
/// <summary>
/// 设备代码
/// </summary>
[Required]
[MaxLength(50)]
public string DeviceCode { get; private set; } = null!;
/// <summary>
/// 运行时代码
/// </summary>
[Required]
[MaxLength(50)]
public string RuntimeCode { get; private set; } = null!;
/// <summary>
/// 私有构造函数
/// </summary>
private ProtocolLog() { }
/// <summary>
/// 创建协议日志
/// </summary>
public static ProtocolLog Create(
long messageId,
int layerType,
int direction,
long timeMs,
long timestamp,
string deviceCode,
string runtimeCode,
string? messageDetailJson = null,
int? cellID = null,
string? imsi = null,
int? ueid = null,
string? plmn = null,
string? info = null,
string? message = null)
{
return new ProtocolLog
{
Id = Guid.NewGuid().ToString(),
MessageId = messageId,
LayerType = (ProtocolLayer)layerType,
MessageDetailJson = messageDetailJson,
CellID = cellID,
IMSI = imsi,
Direction = direction,
UEID = ueid,
PLMN = plmn,
TimeMs = timeMs,
Timestamp = timestamp,
Info = info,
Message = message,
DeviceCode = deviceCode,
RuntimeCode = runtimeCode
};
}
/// <summary>
/// 消息详情集合(用于业务逻辑)
/// </summary>
public IEnumerable<string>? MessageDetail
{
get => !string.IsNullOrEmpty(MessageDetailJson)
? System.Text.Json.JsonSerializer.Deserialize<IEnumerable<string>>(MessageDetailJson)
: null;
}
/// <summary>
/// 时间间隔(用于业务逻辑)
/// </summary>
public TimeSpan Time
{
get => TimeSpan.FromMilliseconds(TimeMs);
}
}

44
src/X1.Domain/Models/DeviceBasicInfo.cs

@ -0,0 +1,44 @@
namespace CellularManagement.Domain.Models;
/// <summary>
/// 设备基本信息模型
/// </summary>
public class DeviceBasicInfo
{
/// <summary>
/// 设备编码
/// </summary>
public string DeviceCode { get; set; } = null!;
/// <summary>
/// IP地址
/// </summary>
public string IpAddress { get; set; } = null!;
/// <summary>
/// Agent端口
/// </summary>
public int AgentPort { get; set; }
/// <summary>
/// 默认构造函数
/// </summary>
public DeviceBasicInfo() { }
/// <summary>
/// 创建设备基本信息实例
/// </summary>
/// <param name="deviceCode">设备编码</param>
/// <param name="ipAddress">IP地址</param>
/// <param name="agentPort">Agent端口</param>
/// <returns>设备基本信息实例</returns>
public static DeviceBasicInfo Create(string deviceCode, string ipAddress, int agentPort)
{
return new DeviceBasicInfo
{
DeviceCode = deviceCode,
IpAddress = ipAddress,
AgentPort = agentPort
};
}
}

14
src/X1.WebSocket/Models/MessageTransferProtocolLog.cs → src/X1.Domain/Models/MessageTransferProtocolLog.cs

@ -4,7 +4,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace X1.WebSocket.Models
namespace X1.Domain.Models
{
/// <summary>
/// 消息传输协议日志模型
@ -21,7 +21,7 @@ namespace X1.WebSocket.Models
/// <summary>
/// 协议层类型
/// </summary>
public string LayerType { get; set; } = string.Empty;
public int LayerType { get; set; }
/// <summary>
/// 消息详情集合(JSON格式存储)
@ -73,6 +73,16 @@ namespace X1.WebSocket.Models
/// </summary>
public string? Message { get; set; }
/// <summary>
/// 设备代码
/// </summary>
public string? DeviceCode { get; set; }
/// <summary>
/// 运行时代码
/// </summary>
public string RuntimeCode { get; set; } = null!;
/// <summary>
/// 消息详情集合(用于业务逻辑)
/// </summary>

16
src/X1.Domain/Repositories/Base/IQueryRepository.cs

@ -56,6 +56,22 @@ public interface IQueryRepository<T> where T : class
/// </remarks>
Task<IEnumerable<T>> FindAsync(Expression<Func<T, bool>> predicate, Func<IQueryable<T>, IQueryable<T>>? include = null, CancellationToken cancellationToken = default);
/// <summary>
/// 根据条件查询实体并投影到指定类型
/// </summary>
/// <typeparam name="TResult">投影结果类型</typeparam>
/// <param name="predicate">查询条件表达式</param>
/// <param name="select">投影表达式</param>
/// <param name="include">查询条件表达式</param>
/// <param name="cancellationToken">取消令牌,用于取消异步操作</param>
/// <returns>投影后的结果集合</returns>
/// <remarks>
/// 这是一个异步操作,因为需要等待数据库的查询操作完成
/// 使用投影查询可以只获取需要的字段,提高性能
/// 投影查询会被转换为 SQL 语句在数据库端执行
/// </remarks>
Task<IEnumerable<TResult>> FindAsync<TResult>(Expression<Func<T, bool>> predicate, Expression<Func<T, TResult>> select, Func<IQueryable<T>, IQueryable<T>>? include = null, CancellationToken cancellationToken = default);
/// <summary>
/// 分页查询实体
/// </summary>

8
src/X1.Domain/Repositories/Device/ICellularDeviceRepository.cs

@ -1,6 +1,7 @@
using CellularManagement.Domain.Entities;
using CellularManagement.Domain.Entities.Device;
using CellularManagement.Domain.Repositories.Base;
using CellularManagement.Domain.Models;
namespace CellularManagement.Domain.Repositories.Device;
@ -89,4 +90,11 @@ public interface ICellularDeviceRepository : IBaseRepository<CellularDevice>
/// 获取设备总数
/// </summary>
Task<int> GetDeviceCountAsync(CancellationToken cancellationToken = default);
/// <summary>
/// 获取设备基本信息集合(DeviceCode、IpAddress、AgentPort)
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>设备基本信息集合</returns>
Task<IList<DeviceBasicInfo>> GetDeviceBasicInfoListAsync(CancellationToken cancellationToken = default);
}

5
src/X1.Domain/Repositories/Device/ICellularDeviceRuntimeRepository.cs

@ -39,6 +39,11 @@ public interface ICellularDeviceRuntimeRepository : IBaseRepository<CellularDevi
/// </summary>
Task<CellularDeviceRuntime?> GetRuntimeByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default);
/// <summary>
/// 根据设备编号获取所有运行时状态
/// </summary>
Task<IList<CellularDeviceRuntime>> GetRuntimesByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default);
/// <summary>
/// 根据设备编号获取运行时状态(包含设备信息)
/// </summary>

75
src/X1.Domain/Repositories/Logging/IProtocolLogRepository.cs

@ -0,0 +1,75 @@
using CellularManagement.Domain.Entities.Logging;
using CellularManagement.Domain.Repositories.Base;
using X1.Domain.Entities.Logging;
namespace CellularManagement.Domain.Repositories.Logging;
/// <summary>
/// 协议日志仓储接口
/// </summary>
public interface IProtocolLogRepository : IBaseRepository<ProtocolLog>
{
/// <summary>
/// 根据设备代码获取协议日志
/// </summary>
/// <param name="deviceCode">设备代码</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
Task<IEnumerable<ProtocolLog>> GetByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default);
/// <summary>
/// 根据运行时代码获取协议日志
/// </summary>
/// <param name="runtimeCode">运行时代码</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
Task<IEnumerable<ProtocolLog>> GetByRuntimeCodeAsync(string runtimeCode, CancellationToken cancellationToken = default);
/// <summary>
/// 根据设备代码和运行时代码获取协议日志
/// </summary>
/// <param name="deviceCode">设备代码</param>
/// <param name="runtimeCode">运行时代码</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
Task<IEnumerable<ProtocolLog>> GetByDeviceAndRuntimeCodeAsync(string deviceCode, string runtimeCode, CancellationToken cancellationToken = default);
/// <summary>
/// 根据时间范围获取协议日志
/// </summary>
/// <param name="startTimestamp">开始时间戳</param>
/// <param name="endTimestamp">结束时间戳</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
Task<IEnumerable<ProtocolLog>> GetByTimeRangeAsync(long startTimestamp, long endTimestamp, CancellationToken cancellationToken = default);
/// <summary>
/// 根据协议层类型获取协议日志
/// </summary>
/// <param name="layerType">协议层类型</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
Task<IEnumerable<ProtocolLog>> GetByLayerTypeAsync(ProtocolLayer layerType, CancellationToken cancellationToken = default);
/// <summary>
/// 根据设备代码和运行时状态获取协议日志(高性能查询)
/// </summary>
/// <param name="deviceCode">设备代码</param>
/// <param name="runtimeCodes">运行时代码集合</param>
/// <param name="startTimestamp">开始时间戳</param>
/// <param name="endTimestamp">结束时间戳</param>
/// <param name="layerType">协议层类型</param>
/// <param name="orderByDescending">是否按时间戳降序排序</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
Task<IEnumerable<ProtocolLog>> GetByDeviceWithFiltersAsync(
string deviceCode,
IEnumerable<string>? runtimeCodes = null,
long? startTimestamp = null,
long? endTimestamp = null,
string? layerType = null,
bool orderByDescending = true,
CancellationToken cancellationToken = default);
}

37
src/X1.Domain/Transmission/IProtocolChannelManager.cs

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using CellularManagement.Domain.Entities.Logging;
namespace X1.Domain.Transmission
{
/// <summary>
/// 协议通道管理器接口
/// 提供协议日志的读取、写入和清空功能
/// </summary>
public interface IProtocolChannelManager
{
/// <summary>
/// 从通道读取协议日志
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志集合</returns>
Task<IEnumerable<ProtocolLog>> ReadFromChannelAsync(CancellationToken cancellationToken = default);
/// <summary>
/// 向通道写入协议日志
/// </summary>
/// <param name="protocolLogs">协议日志集合</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>写入结果</returns>
Task<bool> WriteToChannelAsync(IEnumerable<ProtocolLog> protocolLogs, CancellationToken cancellationToken = default);
/// <summary>
/// 清空协议通道
/// </summary>
/// <returns>清空结果</returns>
Task<bool> ClearChannelAsync();
}
}

14
src/X1.Domain/Transmission/IProtocolLogObserver.cs

@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using X1.Domain.Models;
namespace X1.Domain.Transmission
{
public interface IProtocolLogObserver
{
public Task OnProtocolLogsReceived(IEnumerable<MessageTransferProtocolLog> logDetails);
}
}

1
src/X1.Domain/X1.Domain.csproj

@ -12,6 +12,7 @@
<PackageReference Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="7.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
</Project>

94
src/X1.Infrastructure/Configurations/Logging/ProtocolLogConfiguration.cs

@ -0,0 +1,94 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using CellularManagement.Domain.Entities.Logging;
namespace CellularManagement.Infrastructure.Configurations.Logging;
/// <summary>
/// ProtocolLog 实体配置类
/// 用于配置协议日志实体在数据库中的映射关系
/// </summary>
public sealed class ProtocolLogConfiguration : IEntityTypeConfiguration<ProtocolLog>
{
/// <summary>
/// 配置 ProtocolLog 实体
/// </summary>
/// <param name="builder">实体类型构建器</param>
public void Configure(EntityTypeBuilder<ProtocolLog> builder)
{
// 配置表名
builder.ToTable("tb_ProtocolLog", t => t.HasComment("协议日志表"));
// 配置主键
builder.HasKey(p => p.Id);
// 配置索引
builder.HasIndex(p => p.MessageId).HasDatabaseName("IX_ProtocolLog_MessageId");
builder.HasIndex(p => p.DeviceCode).HasDatabaseName("IX_ProtocolLog_DeviceCode");
builder.HasIndex(p => p.RuntimeCode).HasDatabaseName("IX_ProtocolLog_RuntimeCode");
builder.HasIndex(p => p.Timestamp).HasDatabaseName("IX_ProtocolLog_Timestamp");
builder.HasIndex(p => p.LayerType).HasDatabaseName("IX_ProtocolLog_LayerType");
builder.HasIndex(p => new { p.DeviceCode, p.RuntimeCode }).HasDatabaseName("IX_ProtocolLog_DeviceCode_RuntimeCode");
builder.HasIndex(p => new { p.DeviceCode, p.Timestamp }).HasDatabaseName("IX_ProtocolLog_DeviceCode_Timestamp");
// 配置属性
builder.Property(p => p.Id)
.HasComment("主键ID");
builder.Property(p => p.MessageId)
.IsRequired()
.HasComment("消息ID");
builder.Property(p => p.LayerType)
.IsRequired()
.HasMaxLength(50)
.HasComment("协议层类型");
builder.Property(p => p.MessageDetailJson)
.HasComment("消息详情集合(JSON格式存储)");
builder.Property(p => p.CellID)
.HasComment("小区ID");
builder.Property(p => p.IMSI)
.HasMaxLength(50)
.HasComment("国际移动用户识别码");
builder.Property(p => p.Direction)
.IsRequired()
.HasComment("日志方向类型");
builder.Property(p => p.UEID)
.HasComment("用户设备ID");
builder.Property(p => p.PLMN)
.HasMaxLength(20)
.HasComment("公共陆地移动网络标识");
builder.Property(p => p.TimeMs)
.IsRequired()
.HasComment("时间间隔(毫秒)");
builder.Property(p => p.Timestamp)
.IsRequired()
.HasComment("时间戳");
builder.Property(p => p.Info)
.HasMaxLength(500)
.HasComment("信息字段");
builder.Property(p => p.Message)
.HasMaxLength(1000)
.HasComment("消息字段");
builder.Property(p => p.DeviceCode)
.IsRequired()
.HasMaxLength(50)
.HasComment("设备代码");
builder.Property(p => p.RuntimeCode)
.IsRequired()
.HasMaxLength(50)
.HasComment("运行时代码");
}
}

5
src/X1.Infrastructure/Context/AppDbContext.cs

@ -37,6 +37,11 @@ public class AppDbContext : IdentityDbContext<AppUser, AppRole, string>
/// </summary>
public DbSet<LoginLog> LoginLogs { get; set; } = null!;
/// <summary>
/// 协议日志集合
/// </summary>
public DbSet<ProtocolLog> ProtocolLogs { get; set; } = null!;
/// <summary>
/// 蜂窝设备集合
/// </summary>

4
src/X1.Infrastructure/DependencyInjection.cs

@ -25,9 +25,10 @@ using CellularManagement.Infrastructure.Services.Security;
using CellularManagement.Infrastructure.Services.UserManagement;
using CellularManagement.Domain.Repositories.Device;
using CellularManagement.Domain.Repositories.NetworkProfile;
using CellularManagement.Domain.Repositories.Logging;
using CellularManagement.Infrastructure.Repositories.Device;
using CellularManagement.Infrastructure.Repositories.NetworkProfile;
using CellularManagement.Infrastructure.Repositories.Logging;
namespace CellularManagement.Infrastructure;
/// <summary>
@ -172,6 +173,7 @@ public static class DependencyInjection
services.AddScoped<IPermissionRepository, PermissionRepository>();
services.AddScoped<ILoginLogRepository, LoginLogRepository>();
services.AddScoped<IUserRoleRepository, UserRoleRepository>();
services.AddScoped<IProtocolLogRepository, ProtocolLogRepository>();
// 注册设备相关仓储
services.AddScoped<ICellularDeviceRepository, CellularDeviceRepository>();

11
src/X1.Infrastructure/Repositories/CQRS/QueryRepository.cs

@ -74,6 +74,17 @@ public class QueryRepository<T> : IQueryRepository<T> where T : class
return await query.ToListAsync(cancellationToken);
}
/// <summary>
/// 根据条件查询实体并投影到指定类型
/// </summary>
public async Task<IEnumerable<TResult>> FindAsync<TResult>(Expression<Func<T, bool>> predicate, Expression<Func<T, TResult>> select, Func<IQueryable<T>, IQueryable<T>>? include = null, CancellationToken cancellationToken = default)
{
IQueryable<T> query = _dbSet.Where(predicate);
if (include != null)
query = include(query);
return await query.Select(select).ToListAsync(cancellationToken);
}
/// <summary>
/// 分页查询实体
/// </summary>

20
src/X1.Infrastructure/Repositories/Device/CellularDeviceRepository.cs

@ -12,6 +12,7 @@ using CellularManagement.Infrastructure.Repositories.Base;
using CellularManagement.Domain.Entities.Device;
using CellularManagement.Domain.Repositories.Base;
using CellularManagement.Domain.Repositories.Device;
using CellularManagement.Domain.Models;
namespace CellularManagement.Infrastructure.Repositories.Device;
@ -207,4 +208,23 @@ public class CellularDeviceRepository : BaseRepository<CellularDevice>, ICellula
{
return await QueryRepository.CountAsync(d => true, cancellationToken: cancellationToken);
}
/// <summary>
/// 获取设备基本信息集合(DeviceCode、IpAddress、AgentPort)
/// </summary>
public async Task<IList<DeviceBasicInfo>> GetDeviceBasicInfoListAsync(CancellationToken cancellationToken = default)
{
// 直接从数据库查询只需要的三个字段,只获取启用的设备
var deviceBasicInfos = await QueryRepository.FindAsync(
d => d.IsEnabled, // 只获取启用的设备
select: d => new DeviceBasicInfo
{
DeviceCode = d.DeviceCode,
IpAddress = d.IpAddress,
AgentPort = d.AgentPort
},
cancellationToken: cancellationToken);
return deviceBasicInfos.ToList();
}
}

9
src/X1.Infrastructure/Repositories/Device/CellularDeviceRuntimeRepository.cs

@ -86,6 +86,15 @@ public class CellularDeviceRuntimeRepository : BaseRepository<CellularDeviceRunt
return runtimes.OrderByDescending(r => r.CreatedAt).FirstOrDefault();
}
/// <summary>
/// 根据设备编号获取所有运行时状态
/// </summary>
public async Task<IList<CellularDeviceRuntime>> GetRuntimesByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default)
{
var runtimes = await QueryRepository.FindAsync(r => r.DeviceCode == deviceCode, cancellationToken: cancellationToken);
return runtimes.OrderByDescending(r => r.CreatedAt).ToList();
}
/// <summary>
/// 根据设备编号获取运行时状态(包含设备信息)
/// </summary>

226
src/X1.Infrastructure/Repositories/Logging/ProtocolLogRepository.cs

@ -0,0 +1,226 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using CellularManagement.Domain.Entities.Logging;
using CellularManagement.Domain.Entities.Device;
using CellularManagement.Domain.Repositories.Logging;
using CellularManagement.Domain.Repositories.Base;
using CellularManagement.Infrastructure.Repositories.Base;
using System.Linq;
using X1.Domain.Entities.Logging;
namespace CellularManagement.Infrastructure.Repositories.Logging;
/// <summary>
/// 协议日志仓储实现类
/// </summary>
public class ProtocolLogRepository : BaseRepository<ProtocolLog>, IProtocolLogRepository
{
private readonly ILogger<ProtocolLogRepository> _logger;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="commandRepository">命令仓储</param>
/// <param name="queryRepository">查询仓储</param>
/// <param name="logger">日志记录器</param>
public ProtocolLogRepository(
ICommandRepository<ProtocolLog> commandRepository,
IQueryRepository<ProtocolLog> queryRepository,
ILogger<ProtocolLogRepository> logger)
: base(commandRepository, queryRepository, logger)
{
_logger = logger;
}
/// <summary>
/// 根据设备代码获取协议日志
/// </summary>
/// <param name="deviceCode">设备代码</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
public async Task<IEnumerable<ProtocolLog>> GetByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default)
{
try
{
var logs = await QueryRepository.FindAsync(
p => p.DeviceCode == deviceCode,
cancellationToken: cancellationToken);
return logs.OrderByDescending(p => p.Timestamp);
}
catch (Exception ex)
{
_logger.LogError(ex, "获取设备代码 {DeviceCode} 的协议日志时发生错误", deviceCode);
throw;
}
}
/// <summary>
/// 根据运行时代码获取协议日志
/// </summary>
/// <param name="runtimeCode">运行时代码</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
public async Task<IEnumerable<ProtocolLog>> GetByRuntimeCodeAsync(string runtimeCode, CancellationToken cancellationToken = default)
{
try
{
var logs = await QueryRepository.FindAsync(
p => p.RuntimeCode == runtimeCode,
cancellationToken: cancellationToken);
return logs.OrderByDescending(p => p.Timestamp);
}
catch (Exception ex)
{
_logger.LogError(ex, "获取运行时代码 {RuntimeCode} 的协议日志时发生错误", runtimeCode);
throw;
}
}
/// <summary>
/// 根据设备代码和运行时代码获取协议日志
/// </summary>
/// <param name="deviceCode">设备代码</param>
/// <param name="runtimeCode">运行时代码</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
public async Task<IEnumerable<ProtocolLog>> GetByDeviceAndRuntimeCodeAsync(string deviceCode, string runtimeCode, CancellationToken cancellationToken = default)
{
try
{
var logs = await QueryRepository.FindAsync(
p => p.DeviceCode == deviceCode && p.RuntimeCode == runtimeCode,
cancellationToken: cancellationToken);
return logs.OrderByDescending(p => p.Timestamp);
}
catch (Exception ex)
{
_logger.LogError(ex, "获取设备代码 {DeviceCode} 和运行时代码 {RuntimeCode} 的协议日志时发生错误", deviceCode, runtimeCode);
throw;
}
}
/// <summary>
/// 根据时间范围获取协议日志
/// </summary>
/// <param name="startTimestamp">开始时间戳</param>
/// <param name="endTimestamp">结束时间戳</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
public async Task<IEnumerable<ProtocolLog>> GetByTimeRangeAsync(long startTimestamp, long endTimestamp, CancellationToken cancellationToken = default)
{
try
{
var logs = await QueryRepository.FindAsync(
p => p.Timestamp >= startTimestamp && p.Timestamp <= endTimestamp,
cancellationToken: cancellationToken);
return logs.OrderByDescending(p => p.Timestamp);
}
catch (Exception ex)
{
_logger.LogError(ex, "获取时间范围 {StartTimestamp} 到 {EndTimestamp} 的协议日志时发生错误", startTimestamp, endTimestamp);
throw;
}
}
/// <summary>
/// 根据协议层类型获取协议日志
/// </summary>
/// <param name="layerType">协议层类型</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
public async Task<IEnumerable<ProtocolLog>> GetByLayerTypeAsync(ProtocolLayer layerType, CancellationToken cancellationToken = default)
{
try
{
var logs = await QueryRepository.FindAsync(
p => p.LayerType == layerType,
cancellationToken: cancellationToken);
return logs.OrderByDescending(p => p.Timestamp);
}
catch (Exception ex)
{
_logger.LogError(ex, "获取协议层类型 {LayerType} 的协议日志时发生错误", layerType);
throw;
}
}
/// <summary>
/// 根据设备代码和运行时状态获取协议日志(高性能查询)
/// </summary>
/// <param name="deviceCode">设备代码</param>
/// <param name="runtimeCodes">运行时代码集合</param>
/// <param name="startTimestamp">开始时间戳</param>
/// <param name="endTimestamp">结束时间戳</param>
/// <param name="layerType">协议层类型</param>
/// <param name="orderByDescending">是否按时间戳降序排序</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>协议日志列表</returns>
public async Task<IEnumerable<ProtocolLog>> GetByDeviceWithFiltersAsync(
string deviceCode,
IEnumerable<string>? runtimeCodes = null,
long? startTimestamp = null,
long? endTimestamp = null,
string? layerType = null,
bool orderByDescending = true,
CancellationToken cancellationToken = default)
{
try
{
// 构建 SQL 查询
var sql = @"
SELECT pl.*
FROM ""ProtocolLogs"" pl
INNER JOIN ""CellularDeviceRuntimes"" cdr
ON pl.""DeviceCode"" = cdr.""DeviceCode""
AND pl.""RuntimeCode"" = cdr.""RuntimeCode""
WHERE pl.""DeviceCode"" = @deviceCode";
var parameters = new List<object> { deviceCode };
// 添加运行时编码过滤
if (runtimeCodes != null && runtimeCodes.Any())
{
var runtimeCodeList = string.Join(",", runtimeCodes.Select((_, i) => $"@runtimeCode{i}"));
sql += $" AND pl.\"RuntimeCode\" IN ({runtimeCodeList})";
parameters.AddRange(runtimeCodes);
}
// 添加时间范围过滤
if (startTimestamp.HasValue)
{
sql += " AND pl.\"Timestamp\" >= @startTimestamp";
parameters.Add(startTimestamp.Value);
}
if (endTimestamp.HasValue)
{
sql += " AND pl.\"Timestamp\" <= @endTimestamp";
parameters.Add(endTimestamp.Value);
}
// 添加协议层类型过滤
if (!string.IsNullOrEmpty(layerType))
{
sql += " AND pl.\"LayerType\" = @layerType";
parameters.Add(layerType);
}
// 添加排序
sql += orderByDescending ? " ORDER BY pl.\"Timestamp\" DESC" : " ORDER BY pl.\"Timestamp\" ASC";
// 执行 SQL 查询
var logs = await QueryRepository.ExecuteSqlQueryAsync<ProtocolLog>(sql, parameters.ToArray(), cancellationToken);
return logs;
}
catch (Exception ex)
{
_logger.LogError(ex, "获取设备代码 {DeviceCode} 的协议日志时发生错误", deviceCode);
throw;
}
}
}

6
src/X1.Infrastructure/X1.Infrastructure.csproj

@ -7,12 +7,6 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<Compile Remove="Repositories\Logging\**" />
<EmbeddedResource Remove="Repositories\Logging\**" />
<None Remove="Repositories\Logging\**" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\X1.Domain\X1.Domain.csproj" />
</ItemGroup>

78
src/X1.Presentation/Controllers/ProtocolLogsController.cs

@ -0,0 +1,78 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Logging;
using CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice;
using CellularManagement.Domain.Common;
using CellularManagement.Domain.Entities.Device;
using CellularManagement.Presentation.Abstractions;
using MediatR;
namespace CellularManagement.Presentation.Controllers;
/// <summary>
/// 协议日志控制器
/// </summary>
[ApiController]
[Route("api/[controller]")]
[Authorize]
public class ProtocolLogsController : ApiController
{
private readonly ILogger<ProtocolLogsController> _logger;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="mediator">中介者</param>
/// <param name="logger">日志记录器</param>
public ProtocolLogsController(IMediator mediator, ILogger<ProtocolLogsController> logger)
: base(mediator)
{
_logger = logger;
}
/// <summary>
/// 根据设备代码获取协议日志
/// </summary>
/// <param name="deviceCode">设备代码</param>
/// <param name="startTimestamp">开始时间戳</param>
/// <param name="endTimestamp">结束时间戳</param>
/// <param name="layerType">协议层类型</param>
/// <param name="deviceRuntimeStatus">设备运行时状态</param>
/// <param name="orderByDescending">是否按时间戳降序排序</param>
/// <returns>协议日志列表</returns>
[HttpGet("device/{deviceCode}")]
public async Task<OperationResult<GetProtocolLogsByDeviceResponse>> GetProtocolLogsByDevice(
string deviceCode,
[FromQuery] long? startTimestamp = null,
[FromQuery] long? endTimestamp = null,
[FromQuery] string? layerType = null,
[FromQuery] int? deviceRuntimeStatus = null,
[FromQuery] bool orderByDescending = true)
{
_logger.LogInformation("开始获取设备 {DeviceCode} 的协议日志,开始时间戳: {StartTimestamp}, 结束时间戳: {EndTimestamp}, 协议层类型: {LayerType}, 运行时状态: {DeviceRuntimeStatus}",
deviceCode, startTimestamp, endTimestamp, layerType, deviceRuntimeStatus);
var query = new GetProtocolLogsByDeviceQuery
{
DeviceCode = deviceCode,
StartTimestamp = startTimestamp,
EndTimestamp = endTimestamp,
LayerType = layerType,
DeviceRuntimeStatus = deviceRuntimeStatus.HasValue ? (DeviceRuntimeStatus)deviceRuntimeStatus.Value : null,
OrderByDescending = orderByDescending
};
var result = await mediator.Send(query);
if (!result.IsSuccess)
{
_logger.LogWarning("获取设备 {DeviceCode} 的协议日志失败: {Message}", deviceCode, result.ErrorMessages);
return result;
}
_logger.LogInformation("成功获取设备 {DeviceCode} 的协议日志,共 {Count} 条记录",
deviceCode, result.Data?.Items?.Count ?? 0);
return result;
}
}

7
src/X1.WebSocket/Extensions/ServiceCollectionExtensions.cs

@ -4,6 +4,8 @@ using CellularManagement.WebSocket.Middleware;
using CellularManagement.WebSocket.Services;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using X1.Domain.Transmission;
using X1.WebSocket.Transmission;
namespace CellularManagement.WebSocket.Extensions;
@ -26,11 +28,16 @@ public static class ServiceCollectionExtensions
// 注册连接管理协调器
services.AddSingleton<ConnectionManagerCoordinator>();
// 注册协议日志观察者(单例,因为需要在整个应用生命周期中保持状态)
// 必须在 ProtocolMessageHandler 之前注册,因为 ProtocolMessageHandler 依赖 IProtocolLogObserver
services.AddSingleton<IProtocolLogObserver, NetworkProtocolLogObserver>();
// 注册 WebSocket 消息处理器
services.AddSingleton<IWebSocketMessageHandler, ChatMessageHandler>();
services.AddSingleton<IWebSocketMessageHandler, HeartbeatHandlerManager>();
services.AddSingleton<IWebSocketMessageHandler, NotificationMessageHandler>();
services.AddSingleton<IWebSocketMessageHandler, ProtocolMessageHandler>();
// 注册后台服务
services.AddHostedService<WebSocketMessageService>();
services.AddHostedService<ConnectionHealthCheckService>();

50
src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs

@ -6,6 +6,8 @@ using System.Text.Json;
using System.Threading.Tasks;
using CellularManagement.WebSocket.Models;
using Microsoft.Extensions.Logging;
using X1.Domain.Models;
using X1.Domain.Transmission;
using X1.WebSocket.Models;
namespace CellularManagement.WebSocket.Handlers
@ -14,10 +16,11 @@ namespace CellularManagement.WebSocket.Handlers
{
private readonly ILogger<ChatMessageHandler> _logger;
private readonly JsonSerializerOptions _jsonOptions;
public ProtocolMessageHandler(ILogger<ChatMessageHandler> logger)
private readonly IProtocolLogObserver _logObserver;
public ProtocolMessageHandler(ILogger<ChatMessageHandler> logger, IProtocolLogObserver logObserver)
{
_logger = logger;
_logObserver = logObserver;
_jsonOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
@ -98,31 +101,40 @@ namespace CellularManagement.WebSocket.Handlers
/// <param name="protocolLogs">协议日志数组</param>
private void ProcessProtocolLogs(MessageTransferProtocolLog[] protocolLogs)
{
var processedCount = 0;
var errorCount = 0;
foreach (var log in protocolLogs)
try
{
try
_logObserver.OnProtocolLogsReceived(protocolLogs);
var processedCount = 0;
var errorCount = 0;
foreach (var log in protocolLogs)
{
ProcessSingleProtocolLog(log);
processedCount++;
try
{
ProcessSingleProtocolLog(log);
processedCount++;
}
catch (Exception ex)
{
errorCount++;
_logger.LogError(ex, "处理单个协议日志时发生错误,日志ID:{LogId}", log.Id);
}
}
catch (Exception ex)
if (errorCount > 0)
{
errorCount++;
_logger.LogError(ex, "处理单个协议日志时发生错误,日志ID:{LogId}", log.Id);
_logger.LogWarning("协议日志处理完成,成功:{SuccessCount},失败:{ErrorCount}",
processedCount, errorCount);
}
else
{
_logger.LogDebug("协议日志处理完成,成功处理 {ProcessedCount} 条日志", processedCount);
}
}
if (errorCount > 0)
{
_logger.LogWarning("协议日志处理完成,成功:{SuccessCount},失败:{ErrorCount}",
processedCount, errorCount);
}
else
catch (Exception ex)
{
_logger.LogDebug("协议日志处理完成,成功处理 {ProcessedCount} 条日志", processedCount);
_logger.LogDebug("转发日志数据异常{ex}", ex);
}
}

1
src/X1.WebSocket/Models/ProtocolMessage.cs

@ -4,6 +4,7 @@ using System.Linq;
using System.Text;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using X1.Domain.Models;
using X1.WebSocket.Models;
namespace CellularManagement.WebSocket.Models

152
src/X1.WebSocket/Transmission/NetworkProtocolLogObserver.cs

@ -0,0 +1,152 @@
using CellularManagement.Domain.Entities.Logging;
using Microsoft.Extensions.Logging;
using X1.Domain.Models;
using X1.Domain.Transmission;
namespace X1.WebSocket.Transmission
{
/// <summary>
/// 网络协议日志观察者
/// 负责接收协议日志并将其写入通道
/// </summary>
public class NetworkProtocolLogObserver : IProtocolLogObserver
{
private readonly ILogger<NetworkProtocolLogObserver> _logger;
private readonly IProtocolChannelManager _channelManager;
public NetworkProtocolLogObserver(ILogger<NetworkProtocolLogObserver> logger, IProtocolChannelManager channelManager)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_channelManager = channelManager ?? throw new ArgumentNullException(nameof(channelManager));
}
/// <summary>
/// 处理接收到的协议日志
/// </summary>
/// <param name="logDetails">协议日志集合</param>
/// <returns>处理任务</returns>
public async Task OnProtocolLogsReceived(IEnumerable<MessageTransferProtocolLog> logDetails)
{
// 参数验证
if (logDetails == null)
{
_logger.LogWarning("接收到空的协议日志集合");
return;
}
var logList = logDetails.ToList();
if (!logList.Any())
{
_logger.LogDebug("接收到空的协议日志集合");
return;
}
try
{
_logger.LogInformation("开始处理协议日志,数量:{Count}", logList.Count);
// 过滤和验证日志数据
var validLogs = FilterValidLogs(logList);
if (!validLogs.Any())
{
_logger.LogWarning("没有有效的协议日志需要处理");
return;
}
// 转换为ProtocolLog
var protocolLogs = ConvertToProtocolLogs(validLogs);
// 写入通道
var writeResult = await _channelManager.WriteToChannelAsync(protocolLogs, CancellationToken.None);
if (writeResult)
{
_logger.LogDebug("成功写入协议日志到通道,数量:{Count}", protocolLogs.Count());
}
else
{
_logger.LogWarning("写入协议日志到通道失败,数量:{Count}", protocolLogs.Count());
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("协议日志处理被取消");
}
catch (Exception ex)
{
_logger.LogError(ex, "处理协议日志时发生未预期的错误,日志数量:{Count}", logList.Count);
// 不重新抛出异常,避免影响其他操作
}
}
/// <summary>
/// 过滤有效的日志数据
/// </summary>
/// <param name="logs">原始日志集合</param>
/// <returns>有效的日志集合</returns>
private IEnumerable<MessageTransferProtocolLog> FilterValidLogs(IEnumerable<MessageTransferProtocolLog> logs)
{
return logs.Where(log =>
{
// 检查必需字段
if (string.IsNullOrWhiteSpace(log.RuntimeCode))
{
_logger.LogDebug("跳过无效日志:运行时代码为空,ID:{Id}", log.Id);
return false;
}
if (string.IsNullOrWhiteSpace(log.DeviceCode))
{
_logger.LogDebug("跳过无效日志:设备代码为空,ID:{Id}", log.Id);
return false;
}
// 检查时间戳
if (log.Timestamp <= 0)
{
_logger.LogDebug("跳过无效日志:时间戳无效,ID:{Id}", log.Id);
return false;
}
return true;
});
}
/// <summary>
/// 转换为ProtocolLog实体
/// </summary>
/// <param name="logs">MessageTransferProtocolLog集合</param>
/// <returns>ProtocolLog集合</returns>
private IEnumerable<ProtocolLog> ConvertToProtocolLogs(IEnumerable<MessageTransferProtocolLog> logs)
{
return logs.Select(log =>
{
try
{
return ProtocolLog.Create(
messageId: log.Id,
layerType: log.LayerType,
direction: log.Direction,
timeMs: log.TimeMs,
timestamp: log.Timestamp,
deviceCode: log.DeviceCode ?? string.Empty,
runtimeCode: log.RuntimeCode,
messageDetailJson: log.MessageDetailJson,
cellID: log.CellID,
imsi: log.IMSI,
ueid: log.UEID,
plmn: log.PLMN,
info: log.Info,
message: log.Message
);
}
catch (Exception ex)
{
_logger.LogError(ex, "转换协议日志失败,ID:{Id}", log.Id);
return null;
}
}).Where(log => log != null)!;
}
}
}

4
src/X1.WebSocket/X1.WebSocket.csproj

@ -25,4 +25,8 @@
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\X1.Domain\X1.Domain.csproj" />
</ItemGroup>
</Project>

Loading…
Cancel
Save