diff --git a/src/X1.Application/BackendServiceManager/DeviceManagementService.cs b/src/X1.Application/BackendServiceManager/DeviceManagementService.cs new file mode 100644 index 0000000..27b6175 --- /dev/null +++ b/src/X1.Application/BackendServiceManager/DeviceManagementService.cs @@ -0,0 +1,376 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using CellularManagement.Domain.Repositories.Device; +using CellularManagement.Domain.Repositories.Logging; +using CellularManagement.Domain.Entities.Logging; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using X1.DynamicClientCore.Interfaces; +using X1.DynamicClientCore.Models; +using CellularManagement.Domain.Models; +using X1.Domain.Transmission; +using CellularManagement.Domain.Repositories.Base; +using System.Data; + +namespace X1.Application.BackendServiceManager +{ + /// + /// 设备管理服务 - 负责管理设备端点的后台服务 + /// + public class DeviceManagementService : BackgroundService + { + private readonly ILogger _logger; + private readonly IServiceEndpointManager _endpointManager; + private readonly ICellularDeviceRepository _deviceRepository; + private readonly IProtocolChannelManager _protocolChannelManager; + private readonly IProtocolLogRepository _repository; + private readonly IUnitOfWork _unitOfWork; + + // 配置常量 + private const string DEFAULT_PROTOCOL = "http"; + private const string DEFAULT_BASE_PATH = "/api/v1"; + private const int DEFAULT_TIMEOUT = 10; + + public DeviceManagementService( + ICellularDeviceRepository deviceRepository, + IServiceEndpointManager endpointManager, + IProtocolChannelManager protocolChannelManager, + ILogger logger, + IProtocolLogRepository repository, + IUnitOfWork unitOfWork) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _endpointManager = endpointManager ?? throw new ArgumentNullException(nameof(endpointManager)); + _deviceRepository = deviceRepository ?? throw new ArgumentNullException(nameof(deviceRepository)); + _protocolChannelManager = protocolChannelManager ?? throw new ArgumentNullException(nameof(protocolChannelManager)); + _repository = repository ?? throw new ArgumentNullException(nameof(repository)); + _unitOfWork = unitOfWork ?? throw new ArgumentNullException(nameof(unitOfWork)); + } + + /// + /// 执行后台服务的主要逻辑 + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("DeviceManagementService started. Initializing device endpoints..."); + + try + { + await InitializeDeviceEndpointsAsync(stoppingToken); + _logger.LogInformation("DeviceManagementService completed initialization."); + } + catch (OperationCanceledException) + { + _logger.LogInformation("DeviceManagementService was cancelled during initialization."); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error occurred during device endpoint initialization."); + } + + // 启动协议日志处理循环 + _ = Task.Run(() => ProcessProtocolLogsAsync(stoppingToken), stoppingToken); + + // 服务初始化完成后,等待取消请求 + while (!stoppingToken.IsCancellationRequested) + { + await Task.Delay(1000, stoppingToken); // 每秒检查一次取消请求 + } + + _logger.LogInformation("DeviceManagementService stopped."); + } + + /// + /// 处理协议日志的循环 + /// + private async Task ProcessProtocolLogsAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("开始协议日志处理循环"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + // 从通道读取协议日志 + var protocolLogs = await _protocolChannelManager.ReadFromChannelAsync(stoppingToken); + + if (protocolLogs.Any()) + { + await ProcessProtocolLogs(protocolLogs, stoppingToken); + } + else + { + // 没有日志时短暂等待,避免空转 + await Task.Delay(100, stoppingToken); + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("协议日志处理循环被取消"); + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "处理协议日志时发生错误"); + await Task.Delay(1000, stoppingToken); // 错误时等待1秒再继续 + } + } + + _logger.LogInformation("协议日志处理循环已停止"); + } + + /// + /// 处理协议日志 + /// + private async Task ProcessProtocolLogs(IEnumerable protocolLogs, CancellationToken cancellationToken) + { + // 参数验证 + if (protocolLogs == null) + { + _logger.LogWarning("接收到空的协议日志集合"); + return; + } + + var logs = protocolLogs.ToList(); + if (!logs.Any()) + { + _logger.LogDebug("协议日志集合为空,跳过处理"); + return; + } + + _logger.LogDebug("开始处理协议日志,数量:{Count}", logs.Count); + + // 数据验证 + var validLogs = ValidateAndFilterLogs(logs); + if (!validLogs.Any()) + { + _logger.LogWarning("没有有效的协议日志需要处理"); + return; + } + + // 使用事务处理 + using var transaction = await _unitOfWork.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); + try + { + // 批量插入到数据库 + await _repository.AddRangeAsync(validLogs, cancellationToken); + + // 保存更改 + await _unitOfWork.SaveChangesAsync(cancellationToken); + + // 提交事务 + await _unitOfWork.CommitTransactionAsync(transaction, cancellationToken); + + _logger.LogDebug("协议日志批量插入数据库成功,数量:{Count}", validLogs.Count()); + } + catch (OperationCanceledException) + { + _logger.LogInformation("协议日志处理被取消"); + await _unitOfWork.RollbackTransactionAsync(cancellationToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "批量插入协议日志到数据库失败,数量:{Count}", validLogs.Count()); + await _unitOfWork.RollbackTransactionAsync(cancellationToken); + + // 如果批量插入失败,尝试逐个插入 + await ProcessProtocolLogsIndividually(validLogs, cancellationToken); + } + } + + /// + /// 验证和过滤协议日志 + /// + private IEnumerable ValidateAndFilterLogs(IEnumerable logs) + { + return logs.Where(log => + { + if (log == null) + { + _logger.LogDebug("跳过null协议日志"); + return false; + } + + // 验证必需字段 + if (string.IsNullOrWhiteSpace(log.DeviceCode)) + { + _logger.LogDebug("跳过无效日志:设备代码为空,ID:{Id}", log.Id); + return false; + } + + if (string.IsNullOrWhiteSpace(log.RuntimeCode)) + { + _logger.LogDebug("跳过无效日志:运行时代码为空,ID:{Id}", log.Id); + return false; + } + + if (log.Timestamp <= 0) + { + _logger.LogDebug("跳过无效日志:时间戳无效,ID:{Id}", log.Id); + return false; + } + + if (log.MessageId <= 0) + { + _logger.LogDebug("跳过无效日志:消息ID无效,ID:{Id}", log.Id); + return false; + } + + return true; + }); + } + + /// + /// 逐个处理协议日志(批量插入失败时的备用方案) + /// + private async Task ProcessProtocolLogsIndividually(IEnumerable protocolLogs, CancellationToken cancellationToken) + { + var logs = protocolLogs.ToList(); + var successCount = 0; + var errorCount = 0; + + _logger.LogInformation("开始逐个插入协议日志,总数:{Count}", logs.Count); + + // 分批处理,避免内存问题 + const int batchSize = 50; + for (int i = 0; i < logs.Count; i += batchSize) + { + if (cancellationToken.IsCancellationRequested) + { + break; + } + + var batch = logs.Skip(i).Take(batchSize); + using var transaction = await _unitOfWork.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); + + try + { + foreach (var log in batch) + { + try + { + await _repository.AddAsync(log, cancellationToken); + successCount++; + } + catch (Exception ex) + { + errorCount++; + _logger.LogError(ex, "插入单个协议日志失败,ID:{LogId},设备:{DeviceCode}", log.Id, log.DeviceCode); + } + } + + // 保存当前批次的更改 + await _unitOfWork.SaveChangesAsync(cancellationToken); + await _unitOfWork.CommitTransactionAsync(transaction, cancellationToken); + + _logger.LogDebug("批次处理完成,成功:{SuccessCount},失败:{ErrorCount},批次大小:{BatchSize}", + successCount, errorCount, batch.Count()); + } + catch (Exception ex) + { + await _unitOfWork.RollbackTransactionAsync(cancellationToken); + _logger.LogError(ex, "批次处理失败,批次索引:{BatchIndex}", i / batchSize); + errorCount += batch.Count(); + } + } + + _logger.LogInformation("协议日志逐个插入完成,成功:{SuccessCount},失败:{ErrorCount},总数:{TotalCount}", + successCount, errorCount, logs.Count); + } + + /// + /// 处理单个协议日志 + /// + private async Task ProcessSingleProtocolLog(ProtocolLog log, CancellationToken cancellationToken) + { + // 这里可以添加具体的协议日志处理逻辑 + // 例如:保存到数据库、发送通知、更新设备状态等 + + _logger.LogDebug("处理协议日志,ID:{Id},设备:{DeviceCode},运行时:{RuntimeCode},层类型:{LayerType}", + log.Id, log.DeviceCode, log.RuntimeCode, log.LayerType); + + // 示例:根据设备代码查找对应的端点进行处理 + var endpoint = _endpointManager.GetEndpoint(log.DeviceCode); + if (endpoint != null) + { + // 可以在这里调用设备端点的API进行相关操作 + _logger.LogDebug("找到设备端点:{EndpointName},IP:{Ip},端口:{Port}", + endpoint.Name, endpoint.Ip, endpoint.Port); + } + + await Task.CompletedTask; // 占位符,实际处理逻辑待实现 + } + + /// + /// 初始化设备端点信息 + /// + private async Task InitializeDeviceEndpointsAsync(CancellationToken cancellationToken = default) + { + _logger.LogInformation("Initializing device endpoints..."); + + var devices = await _deviceRepository.GetDeviceBasicInfoListAsync(cancellationToken); + + if (devices == null || !devices.Any()) + { + _logger.LogWarning("No devices found to initialize endpoints."); + return; + } + + var successCount = 0; + var skipCount = 0; + + foreach (var device in devices) + { + if (IsValidDeviceInfo(device)) + { + var endpoint = CreateServiceEndpoint(device); + _endpointManager.AddOrUpdateEndpoint(endpoint); + successCount++; + + _logger.LogDebug("Initialized endpoint for device: {DeviceCode} at {IpAddress}:{Port}", + device.DeviceCode, device.IpAddress, device.AgentPort); + } + else + { + _logger.LogWarning("Skipping invalid device: {DeviceCode}", device.DeviceCode); + skipCount++; + } + } + + _logger.LogInformation("Device endpoint initialization completed. Success: {SuccessCount}, Skipped: {SkipCount}, Total: {TotalCount}", + successCount, skipCount, devices.Count); + } + + /// + /// 验证设备信息是否有效 + /// + private static bool IsValidDeviceInfo(DeviceBasicInfo device) + { + return device != null + && !string.IsNullOrWhiteSpace(device.DeviceCode) + && !string.IsNullOrWhiteSpace(device.IpAddress) + && device.AgentPort > 0 + && device.AgentPort <= 65535; + } + + /// + /// 根据设备信息创建服务端点 + /// + private static ServiceEndpoint CreateServiceEndpoint(DeviceBasicInfo device) + { + return new ServiceEndpoint + { + Name = device.DeviceCode, + Ip = device.IpAddress, + Port = device.AgentPort, + Protocol = DEFAULT_PROTOCOL, + BasePath = DEFAULT_BASE_PATH, + Timeout = DEFAULT_TIMEOUT, + Enabled = true + }; + } + } +} diff --git a/src/X1.Application/BackendServiceManager/ProtocolChannelManager.cs b/src/X1.Application/BackendServiceManager/ProtocolChannelManager.cs new file mode 100644 index 0000000..0bc2d47 --- /dev/null +++ b/src/X1.Application/BackendServiceManager/ProtocolChannelManager.cs @@ -0,0 +1,159 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Channels; +using CellularManagement.Domain.Entities.Logging; +using Microsoft.Extensions.Logging; +using X1.Domain.Transmission; + +namespace X1.Application.BackendServiceManager +{ + /// + /// 协议通道管理器实现 + /// 提供协议日志的读取、写入和清空功能 + /// + public class ProtocolChannelManager : IProtocolChannelManager + { + private readonly ILogger _logger; + private readonly Channel _channel; + + public ProtocolChannelManager(ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _channel = Channel.CreateUnbounded(new UnboundedChannelOptions + { + SingleReader = false, + SingleWriter = false, + AllowSynchronousContinuations = false + }); + } + + /// + /// 从通道读取协议日志 + /// + /// 取消令牌 + /// 协议日志集合 + public async Task> ReadFromChannelAsync(CancellationToken cancellationToken = default) + { + try + { + // 等待有数据可读 + if (await _channel.Reader.WaitToReadAsync(cancellationToken)) + { + // 读取一个数组 + if (_channel.Reader.TryRead(out var logsArray)) + { + _logger.LogDebug("从通道读取协议日志数组,数量:{Count}", logsArray.Length); + return logsArray; + } + } + + return Enumerable.Empty(); + } + catch (OperationCanceledException) + { + _logger.LogInformation("读取协议日志操作被取消"); + return Enumerable.Empty(); + } + catch (Exception ex) + { + _logger.LogError(ex, "读取协议日志时发生错误"); + return Enumerable.Empty(); + } + } + + /// + /// 向通道写入协议日志 + /// + /// 协议日志集合 + /// 取消令牌 + /// 写入结果 + public async Task WriteToChannelAsync(IEnumerable protocolLogs, CancellationToken cancellationToken = default) + { + if (protocolLogs == null) + { + _logger.LogWarning("尝试写入空的协议日志集合"); + return false; + } + + try + { + var logs = protocolLogs.ToList(); + if (!logs.Any()) + { + _logger.LogWarning("协议日志集合为空,跳过写入"); + return false; + } + + // 过滤掉null的日志 + var validLogs = logs.Where(log => log != null).ToArray(); + + if (validLogs.Length == 0) + { + _logger.LogWarning("没有有效的协议日志,跳过写入"); + return false; + } + + // 批量写入数组 + await _channel.Writer.WriteAsync(validLogs, cancellationToken); + + _logger.LogDebug("写入协议日志数组到通道,数量:{Count}", validLogs.Length); + return true; + } + catch (OperationCanceledException) + { + _logger.LogInformation("写入协议日志操作被取消"); + return false; + } + catch (Exception ex) + { + _logger.LogError(ex, "写入协议日志时发生错误"); + return false; + } + } + + /// + /// 清空协议通道 + /// + /// 清空结果 + public async Task ClearChannelAsync() + { + try + { + var count = 0; + + // 读取并丢弃所有数组直到通道为空 + while (await _channel.Reader.WaitToReadAsync()) + { + if (_channel.Reader.TryRead(out var logsArray)) + { + count += logsArray.Length; + } + else + { + break; + } + } + + _logger.LogInformation("清空协议通道,清空日志数量:{Count}", count); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "清空协议通道时发生错误"); + return false; + } + } + + /// + /// 获取当前通道中的日志数量(估算) + /// + /// 日志数量 + public int GetChannelCount() + { + return _channel.Reader.Count; + } + } +} \ No newline at end of file diff --git a/src/X1.Application/DependencyInjection.cs b/src/X1.Application/DependencyInjection.cs index 0296146..57b9352 100644 --- a/src/X1.Application/DependencyInjection.cs +++ b/src/X1.Application/DependencyInjection.cs @@ -5,6 +5,8 @@ using CellularManagement.Application.Behaviours; using CellularManagement.Domain.Services; using Microsoft.Extensions.Configuration; using CellularManagement.Domain.Options; +using X1.Application.BackendServiceManager; +using X1.Domain.Transmission; namespace CellularManagement.Application; @@ -42,6 +44,12 @@ public static class DependencyInjection // 注册验证器 services.AddScoped(typeof(IPipelineBehavior<,>), typeof(ValidationBehaviour<,>)); + // 注册协议通道管理器(单例,因为需要在整个应用生命周期中保持状态) + services.AddSingleton(); + + // 注册后台服务(依赖于上面的服务) + services.AddHostedService(); + return services; } } \ No newline at end of file diff --git a/src/X1.Application/Features/DeviceRuntimes/Commands/StartDeviceRuntime/StartDeviceRuntimeCommandHandler.cs b/src/X1.Application/Features/DeviceRuntimes/Commands/StartDeviceRuntime/StartDeviceRuntimeCommandHandler.cs index bbc5679..eab4d00 100644 --- a/src/X1.Application/Features/DeviceRuntimes/Commands/StartDeviceRuntime/StartDeviceRuntimeCommandHandler.cs +++ b/src/X1.Application/Features/DeviceRuntimes/Commands/StartDeviceRuntime/StartDeviceRuntimeCommandHandler.cs @@ -9,6 +9,7 @@ using X1.DynamicClientCore.Features; using CellularManagement.Domain.Repositories.NetworkProfile; using X1.DynamicClientCore.Models; using X1.Domain.Models; +using System.Collections.Concurrent; namespace CellularManagement.Application.Features.DeviceRuntimes.Commands.StartDeviceRuntime; @@ -110,42 +111,7 @@ public class StartDeviceRuntimeCommandHandler : IRequestHandler(); - _logger.LogInformation("开始并行启动网络,请求数量: {RequestCount}", networkRequests.Count); - - await Parallel.ForEachAsync(networkRequests, async (networkRequest, cts) => - { - try - { - _logger.LogDebug("启动网络,设备代码: {DeviceCode}, 运行时代码: {RuntimeCode}", - networkRequest.DeviceCode, networkRequest.RuntimeCode); - var startResult = await _protocolClient.StartNetworkAsync(networkRequest); - _logger.LogDebug("网络启动结果,设备代码: {DeviceCode}, 启动成功: {StartResult}", - networkRequest.DeviceCode, startResult); - - if (startResult) - { - _logger.LogDebug("网络启动成功,设备代码: {DeviceCode}", networkRequest.DeviceCode); - networkResults.Add((true, networkRequest.DeviceCode, string.Empty)); - } - else - { - var errorMessage = "网络启动返回失败状态"; - _logger.LogWarning("网络启动返回失败状态,设备代码: {DeviceCode}", networkRequest.DeviceCode); - networkResults.Add((false, networkRequest.DeviceCode, errorMessage)); - } - } - catch (Exception ex) - { - var errorMessage = $"网络启动失败: {ex.Message}"; - _logger.LogError(ex, "网络启动失败,设备代码: {DeviceCode}", networkRequest.DeviceCode); - networkResults.Add((false, networkRequest.DeviceCode, errorMessage)); - } - }); - - // 检查网络启动结果 - var successfulDevices = networkResults.Where(r => r.Success).Select(r => r.DeviceCode).ToHashSet(); - var failedDevices = networkResults.Where(r => !r.Success).ToList(); + var (successfulDevices, failedDevices) = await StartNetworksInParallelAsync(networkRequests, cancellationToken); if (failedDevices.Any()) { @@ -393,8 +359,90 @@ public class StartDeviceRuntimeCommandHandler : IRequestHandler + /// 并行启动网络并收集结果 + /// + /// 网络配置请求列表 + /// 取消令牌 + /// 成功和失败的设备信息元组 + /// + /// 该方法使用并行执行来同时启动多个网络,提高性能: + /// 1. 使用ConcurrentBag确保线程安全的结果收集 + /// 2. 使用SemaphoreSlim控制并发数量,避免资源竞争(可选) + /// 3. 设置超时机制,防止任务无限等待 + /// 4. 提供详细的错误处理和日志记录 + /// 5. 确保资源正确释放 + /// + private async Task<(HashSet SuccessfulDevices, List<(string DeviceCode, string ErrorMessage)> FailedDevices)> + StartNetworksInParallelAsync(List networkRequests, CancellationToken cancellationToken) + { + var networkResults = new ConcurrentBag<(bool Success, string DeviceCode, string ErrorMessage)>(); + _logger.LogInformation("开始并行启动网络,请求数量: {RequestCount}", networkRequests.Count); + + // 设置超时时间,防止任务无限等待 + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); // 5分钟超时 + using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); + + // 完全并行执行,不受信号量限制 + var tasks = networkRequests.Select(async networkRequest => + { + try + { + _logger.LogDebug("启动网络,设备代码: {DeviceCode}, 运行时代码: {RuntimeCode}", + networkRequest.DeviceCode, networkRequest.RuntimeCode); + + var startResult = await _protocolClient.StartNetworkAsync(networkRequest); + _logger.LogDebug("网络启动结果,设备代码: {DeviceCode}, 启动成功: {StartResult}", + networkRequest.DeviceCode, startResult); + + if (startResult) + { + _logger.LogDebug("网络启动成功,设备代码: {DeviceCode}", networkRequest.DeviceCode); + networkResults.Add((true, networkRequest.DeviceCode, string.Empty)); + } + else + { + var errorMessage = "网络启动返回失败状态"; + _logger.LogWarning("网络启动返回失败状态,设备代码: {DeviceCode}", networkRequest.DeviceCode); + networkResults.Add((false, networkRequest.DeviceCode, errorMessage)); + } + } + catch (OperationCanceledException) when (timeoutCts.Token.IsCancellationRequested) + { + var errorMessage = "网络启动超时"; + _logger.LogWarning("网络启动超时,设备代码: {DeviceCode}", networkRequest.DeviceCode); + networkResults.Add((false, networkRequest.DeviceCode, errorMessage)); + } + catch (Exception ex) + { + var errorMessage = $"网络启动失败: {ex.Message}"; + _logger.LogError(ex, "网络启动失败,设备代码: {DeviceCode}", networkRequest.DeviceCode); + networkResults.Add((false, networkRequest.DeviceCode, errorMessage)); + } + }); + + try + { + // 等待所有任务完成 + await Task.WhenAll(tasks); + } + catch (OperationCanceledException) when (timeoutCts.Token.IsCancellationRequested) + { + _logger.LogWarning("网络启动操作超时,部分设备可能未完成启动"); + } + + // 检查网络启动结果 + var successfulDevices = networkResults.Where(r => r.Success).Select(r => r.DeviceCode).ToHashSet(); + var failedDevices = networkResults.Where(r => !r.Success).Select(r => (r.DeviceCode, r.ErrorMessage)).ToList(); + + _logger.LogInformation("网络启动完成,成功设备数: {SuccessCount}, 失败设备数: {FailureCount}", + successfulDevices.Count, failedDevices.Count); + + return (successfulDevices, failedDevices); + } + } \ No newline at end of file diff --git a/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQuery.cs b/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQuery.cs new file mode 100644 index 0000000..a21ff56 --- /dev/null +++ b/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQuery.cs @@ -0,0 +1,48 @@ +using CellularManagement.Domain.Common; +using CellularManagement.Domain.Entities.Device; +using MediatR; +using System.ComponentModel.DataAnnotations; + +namespace CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice; + +/// +/// 根据设备代码获取协议日志查询 +/// +public class GetProtocolLogsByDeviceQuery : IRequest> +{ + /// + /// 设备代码 + /// + [Required(ErrorMessage = "设备代码不能为空")] + [MaxLength(50, ErrorMessage = "设备代码不能超过50个字符")] + public string DeviceCode { get; set; } = string.Empty; + + /// + /// 开始时间戳 + /// + public long? StartTimestamp { get; set; } + + /// + /// 结束时间戳 + /// + public long? EndTimestamp { get; set; } + + /// + /// 协议层类型 + /// + [MaxLength(50, ErrorMessage = "协议层类型不能超过50个字符")] + public string? LayerType { get; set; } + + /// + /// 设备运行时状态 + /// 当设置为 Running 时,只获取当前运行的数据 + /// + public DeviceRuntimeStatus? DeviceRuntimeStatus { get; set; } + + /// + /// 是否按时间戳降序排序 + /// + public bool OrderByDescending { get; set; } = true; + + +} \ No newline at end of file diff --git a/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQueryHandler.cs b/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQueryHandler.cs new file mode 100644 index 0000000..9d6fd54 --- /dev/null +++ b/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceQueryHandler.cs @@ -0,0 +1,134 @@ +using MediatR; +using Microsoft.Extensions.Logging; +using CellularManagement.Domain.Common; +using CellularManagement.Domain.Entities.Logging; +using CellularManagement.Domain.Entities.Device; +using CellularManagement.Domain.Repositories.Logging; +using CellularManagement.Domain.Repositories.Device; +using System.ComponentModel.DataAnnotations; +using System.Linq; + +namespace CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice; + +/// +/// 根据设备代码获取协议日志查询处理器 +/// +public class GetProtocolLogsByDeviceQueryHandler : IRequestHandler> +{ + private readonly IProtocolLogRepository _protocolLogRepository; + private readonly ICellularDeviceRuntimeRepository _deviceRuntimeRepository; + private readonly ILogger _logger; + + /// + /// 初始化查询处理器 + /// + public GetProtocolLogsByDeviceQueryHandler( + IProtocolLogRepository protocolLogRepository, + ICellularDeviceRuntimeRepository deviceRuntimeRepository, + ILogger logger) + { + _protocolLogRepository = protocolLogRepository; + _deviceRuntimeRepository = deviceRuntimeRepository; + _logger = logger; + } + + /// + /// 处理根据设备代码获取协议日志查询 + /// + public async Task> Handle(GetProtocolLogsByDeviceQuery request, CancellationToken cancellationToken) + { + try + { + // 验证请求参数 + var validationContext = new ValidationContext(request); + var validationResults = new List(); + if (!Validator.TryValidateObject(request, validationContext, validationResults, true)) + { + var errorMessages = validationResults.Select(r => r.ErrorMessage).ToList(); + _logger.LogWarning("请求参数无效: {Errors}", string.Join(", ", errorMessages)); + return OperationResult.CreateFailure(errorMessages); + } + + _logger.LogInformation("开始获取设备 {DeviceCode} 的协议日志,运行时状态: {DeviceRuntimeStatus}", + request.DeviceCode, request.DeviceRuntimeStatus); + + // 获取设备运行时状态(仅在需要时查询) + IEnumerable? runtimeCodes = null; + if (request.DeviceRuntimeStatus.HasValue) + { + var deviceRuntimes = await _deviceRuntimeRepository.GetRuntimesByDeviceCodeAsync( + request.DeviceCode, + cancellationToken); + + if (!deviceRuntimes.Any()) + { + _logger.LogWarning("设备 {DeviceCode} 的运行时状态不存在", request.DeviceCode); + return OperationResult.CreateFailure($"设备 {request.DeviceCode} 的运行时状态不存在"); + } + + // 过滤出匹配状态的运行时 + var matchingRuntimes = deviceRuntimes.Where(r => r.RuntimeStatus == request.DeviceRuntimeStatus.Value).ToList(); + if (!matchingRuntimes.Any()) + { + _logger.LogWarning("设备 {DeviceCode} 的运行时状态不匹配,期望: {ExpectedStatus}", + request.DeviceCode, request.DeviceRuntimeStatus.Value); + return OperationResult.CreateFailure( + $"设备 {request.DeviceCode} 的运行时状态不匹配,期望: {request.DeviceRuntimeStatus.Value}"); + } + + // 获取运行时编码集合 + runtimeCodes = matchingRuntimes.Select(r => r.RuntimeCode).ToList(); + _logger.LogInformation("使用运行时编码集合 {RuntimeCodes} 过滤协议日志", string.Join(", ", runtimeCodes)); + } + + // 使用 JOIN 高性能查询方法 + var protocolLogs = await _protocolLogRepository.GetByDeviceWithFiltersAsync( + request.DeviceCode, + runtimeCodes, + request.StartTimestamp, + request.EndTimestamp, + request.LayerType, + request.OrderByDescending, + cancellationToken); + + // 转换为DTO + var protocolLogDtos = protocolLogs.Select(log => new ProtocolLogDto + { + Id = log.Id, + MessageId = log.MessageId, + LayerType = (int)log.LayerType, + MessageDetailJson = log.MessageDetailJson, + CellID = log.CellID, + IMSI = log.IMSI, + Direction = log.Direction, + UEID = log.UEID, + PLMN = log.PLMN, + TimeMs = log.TimeMs, + Timestamp = log.Timestamp, + Info = log.Info, + Message = log.Message, + DeviceCode = log.DeviceCode, + RuntimeCode = log.RuntimeCode, + MessageDetail = log.MessageDetail, + Time = log.Time + }).ToList(); + + // 构建响应 + var response = new GetProtocolLogsByDeviceResponse + { + DeviceCode = request.DeviceCode, + Items = protocolLogDtos.ToList() + }; + + _logger.LogInformation("成功获取设备 {DeviceCode} 的协议日志,共 {Count} 条记录", + request.DeviceCode, protocolLogDtos.Count); + + return OperationResult.CreateSuccess(response); + } + catch (Exception ex) + { + _logger.LogError(ex, "获取设备 {DeviceCode} 的协议日志时发生错误", request.DeviceCode); + return OperationResult.CreateFailure($"获取协议日志时发生错误: {ex.Message}"); + } + } +} \ No newline at end of file diff --git a/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceResponse.cs b/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceResponse.cs new file mode 100644 index 0000000..8151962 --- /dev/null +++ b/src/X1.Application/Features/ProtocolLogs/Queries/GetProtocolLogsByDevice/GetProtocolLogsByDeviceResponse.cs @@ -0,0 +1,112 @@ +using System.Collections.Generic; + +namespace CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice; + +/// +/// 根据设备代码获取协议日志响应 +/// +public class GetProtocolLogsByDeviceResponse +{ + + + /// + /// 设备代码 + /// + public string DeviceCode { get; set; } = string.Empty; + + /// + /// 协议日志列表 + /// + public List Items { get; set; } = new(); +} + +/// +/// 协议日志数据传输对象 +/// +public class ProtocolLogDto +{ + /// + /// 主键ID + /// + public string Id { get; set; } = string.Empty; + + /// + /// 消息ID + /// + public long MessageId { get; set; } + + /// + /// 协议层类型 + /// + public int LayerType { get; set; } + + /// + /// 消息详情集合(JSON格式存储) + /// + public string? MessageDetailJson { get; set; } + + /// + /// 小区ID + /// + public int? CellID { get; set; } + + /// + /// 国际移动用户识别码 + /// + public string? IMSI { get; set; } + + /// + /// 日志方向类型 + /// + public int Direction { get; set; } + + /// + /// 用户设备ID + /// + public int? UEID { get; set; } + + /// + /// 公共陆地移动网络标识 + /// + public string? PLMN { get; set; } + + /// + /// 时间间隔(毫秒) + /// + public long TimeMs { get; set; } + + /// + /// 时间戳 + /// + public long Timestamp { get; set; } + + /// + /// 信息字段 + /// + public string? Info { get; set; } + + /// + /// 消息字段 + /// + public string? Message { get; set; } + + /// + /// 设备代码 + /// + public string DeviceCode { get; set; } = string.Empty; + + /// + /// 运行时代码 + /// + public string RuntimeCode { get; set; } = string.Empty; + + /// + /// 消息详情集合(用于业务逻辑) + /// + public IEnumerable? MessageDetail { get; set; } + + /// + /// 时间间隔(用于业务逻辑) + /// + public TimeSpan Time { get; set; } +} \ No newline at end of file diff --git a/src/X1.Application/X1.Application.csproj b/src/X1.Application/X1.Application.csproj index 15aa001..602fbe2 100644 --- a/src/X1.Application/X1.Application.csproj +++ b/src/X1.Application/X1.Application.csproj @@ -12,6 +12,7 @@ + diff --git a/src/X1.Domain/Entities/Logging/ProtocolLayer.cs b/src/X1.Domain/Entities/Logging/ProtocolLayer.cs new file mode 100644 index 0000000..2ebc832 --- /dev/null +++ b/src/X1.Domain/Entities/Logging/ProtocolLayer.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace X1.Domain.Entities.Logging +{ + /// + /// 协议层类型枚举 + /// 定义了各种协议层的类型标识 + /// + public enum ProtocolLayer + { + NONE, + GTPU, + LPPa, + M2AP, + MAC, + NAS, + NGAP, + NRPPa, + PDCP, + PROD, + PHY, + RLC, + RRC, + S1AP, + TRX, + X2AP, + XnAP, + IP, + IMS, + CX, + RX, + S6, + S13, + SGsAP, + SBcAP, + LCSAP, + N12, + N8, + N17, + N50, + N13, + NL1, + HTTP2, + EPDG, + IKEV2, + IPSEC, + MEDIA, + MMS, + SIP, + } +} diff --git a/src/X1.Domain/Entities/Logging/ProtocolLog.cs b/src/X1.Domain/Entities/Logging/ProtocolLog.cs new file mode 100644 index 0000000..55d03cc --- /dev/null +++ b/src/X1.Domain/Entities/Logging/ProtocolLog.cs @@ -0,0 +1,159 @@ +using System.ComponentModel.DataAnnotations; +using CellularManagement.Domain.Abstractions; +using CellularManagement.Domain.Entities.Common; +using X1.Domain.Entities.Logging; + +namespace CellularManagement.Domain.Entities.Logging; + +/// +/// 协议日志实体 +/// 用于存储协议层的日志数据 +/// +public class ProtocolLog : Entity +{ + /// + /// 消息ID + /// + [Required] + public long MessageId { get; private set; } + + /// + /// 协议层类型 + /// + [Required] + [MaxLength(50)] + public ProtocolLayer LayerType { get; private set; } + + /// + /// 消息详情集合(JSON格式存储) + /// + public string? MessageDetailJson { get; private set; } + + /// + /// 小区ID + /// + public int? CellID { get; private set; } + + /// + /// 国际移动用户识别码 + /// + [MaxLength(50)] + public string? IMSI { get; private set; } + + /// + /// 日志方向类型 + /// + [Required] + public int Direction { get; private set; } + + /// + /// 用户设备ID + /// + public int? UEID { get; private set; } + + /// + /// 公共陆地移动网络标识 + /// + [MaxLength(20)] + public string? PLMN { get; private set; } + + /// + /// 时间间隔(毫秒) + /// + [Required] + public long TimeMs { get; private set; } + + /// + /// 时间戳 + /// + [Required] + public long Timestamp { get; private set; } + + /// + /// 信息字段 + /// + [MaxLength(500)] + public string? Info { get; private set; } + + /// + /// 消息字段 + /// + [MaxLength(1000)] + public string? Message { get; private set; } + + /// + /// 设备代码 + /// + [Required] + [MaxLength(50)] + public string DeviceCode { get; private set; } = null!; + + /// + /// 运行时代码 + /// + [Required] + [MaxLength(50)] + public string RuntimeCode { get; private set; } = null!; + + /// + /// 私有构造函数 + /// + private ProtocolLog() { } + + /// + /// 创建协议日志 + /// + public static ProtocolLog Create( + long messageId, + int layerType, + int direction, + long timeMs, + long timestamp, + string deviceCode, + string runtimeCode, + string? messageDetailJson = null, + int? cellID = null, + string? imsi = null, + int? ueid = null, + string? plmn = null, + string? info = null, + string? message = null) + { + return new ProtocolLog + { + Id = Guid.NewGuid().ToString(), + MessageId = messageId, + LayerType = (ProtocolLayer)layerType, + MessageDetailJson = messageDetailJson, + CellID = cellID, + IMSI = imsi, + Direction = direction, + UEID = ueid, + PLMN = plmn, + TimeMs = timeMs, + Timestamp = timestamp, + Info = info, + Message = message, + DeviceCode = deviceCode, + RuntimeCode = runtimeCode + }; + } + + /// + /// 消息详情集合(用于业务逻辑) + /// + public IEnumerable? MessageDetail + { + get => !string.IsNullOrEmpty(MessageDetailJson) + ? System.Text.Json.JsonSerializer.Deserialize>(MessageDetailJson) + : null; + } + + /// + /// 时间间隔(用于业务逻辑) + /// + public TimeSpan Time + { + get => TimeSpan.FromMilliseconds(TimeMs); + } +} \ No newline at end of file diff --git a/src/X1.Domain/Models/DeviceBasicInfo.cs b/src/X1.Domain/Models/DeviceBasicInfo.cs new file mode 100644 index 0000000..4d5a636 --- /dev/null +++ b/src/X1.Domain/Models/DeviceBasicInfo.cs @@ -0,0 +1,44 @@ +namespace CellularManagement.Domain.Models; + +/// +/// 设备基本信息模型 +/// +public class DeviceBasicInfo +{ + /// + /// 设备编码 + /// + public string DeviceCode { get; set; } = null!; + + /// + /// IP地址 + /// + public string IpAddress { get; set; } = null!; + + /// + /// Agent端口 + /// + public int AgentPort { get; set; } + + /// + /// 默认构造函数 + /// + public DeviceBasicInfo() { } + + /// + /// 创建设备基本信息实例 + /// + /// 设备编码 + /// IP地址 + /// Agent端口 + /// 设备基本信息实例 + public static DeviceBasicInfo Create(string deviceCode, string ipAddress, int agentPort) + { + return new DeviceBasicInfo + { + DeviceCode = deviceCode, + IpAddress = ipAddress, + AgentPort = agentPort + }; + } +} \ No newline at end of file diff --git a/src/X1.WebSocket/Models/MessageTransferProtocolLog.cs b/src/X1.Domain/Models/MessageTransferProtocolLog.cs similarity index 88% rename from src/X1.WebSocket/Models/MessageTransferProtocolLog.cs rename to src/X1.Domain/Models/MessageTransferProtocolLog.cs index 1827adb..962de9e 100644 --- a/src/X1.WebSocket/Models/MessageTransferProtocolLog.cs +++ b/src/X1.Domain/Models/MessageTransferProtocolLog.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace X1.WebSocket.Models +namespace X1.Domain.Models { /// /// 消息传输协议日志模型 @@ -21,7 +21,7 @@ namespace X1.WebSocket.Models /// /// 协议层类型 /// - public string LayerType { get; set; } = string.Empty; + public int LayerType { get; set; } /// /// 消息详情集合(JSON格式存储) @@ -73,6 +73,16 @@ namespace X1.WebSocket.Models /// public string? Message { get; set; } + /// + /// 设备代码 + /// + public string? DeviceCode { get; set; } + + /// + /// 运行时代码 + /// + public string RuntimeCode { get; set; } = null!; + /// /// 消息详情集合(用于业务逻辑) /// diff --git a/src/X1.Domain/Repositories/Base/IQueryRepository.cs b/src/X1.Domain/Repositories/Base/IQueryRepository.cs index 13c6bd3..7fc7625 100644 --- a/src/X1.Domain/Repositories/Base/IQueryRepository.cs +++ b/src/X1.Domain/Repositories/Base/IQueryRepository.cs @@ -56,6 +56,22 @@ public interface IQueryRepository where T : class /// Task> FindAsync(Expression> predicate, Func, IQueryable>? include = null, CancellationToken cancellationToken = default); + /// + /// 根据条件查询实体并投影到指定类型 + /// + /// 投影结果类型 + /// 查询条件表达式 + /// 投影表达式 + /// 查询条件表达式 + /// 取消令牌,用于取消异步操作 + /// 投影后的结果集合 + /// + /// 这是一个异步操作,因为需要等待数据库的查询操作完成 + /// 使用投影查询可以只获取需要的字段,提高性能 + /// 投影查询会被转换为 SQL 语句在数据库端执行 + /// + Task> FindAsync(Expression> predicate, Expression> select, Func, IQueryable>? include = null, CancellationToken cancellationToken = default); + /// /// 分页查询实体 /// diff --git a/src/X1.Domain/Repositories/Device/ICellularDeviceRepository.cs b/src/X1.Domain/Repositories/Device/ICellularDeviceRepository.cs index f59408e..56a1674 100644 --- a/src/X1.Domain/Repositories/Device/ICellularDeviceRepository.cs +++ b/src/X1.Domain/Repositories/Device/ICellularDeviceRepository.cs @@ -1,6 +1,7 @@ using CellularManagement.Domain.Entities; using CellularManagement.Domain.Entities.Device; using CellularManagement.Domain.Repositories.Base; +using CellularManagement.Domain.Models; namespace CellularManagement.Domain.Repositories.Device; @@ -89,4 +90,11 @@ public interface ICellularDeviceRepository : IBaseRepository /// 获取设备总数 /// Task GetDeviceCountAsync(CancellationToken cancellationToken = default); + + /// + /// 获取设备基本信息集合(DeviceCode、IpAddress、AgentPort) + /// + /// 取消令牌 + /// 设备基本信息集合 + Task> GetDeviceBasicInfoListAsync(CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/X1.Domain/Repositories/Device/ICellularDeviceRuntimeRepository.cs b/src/X1.Domain/Repositories/Device/ICellularDeviceRuntimeRepository.cs index 5ca8d04..b0787a8 100644 --- a/src/X1.Domain/Repositories/Device/ICellularDeviceRuntimeRepository.cs +++ b/src/X1.Domain/Repositories/Device/ICellularDeviceRuntimeRepository.cs @@ -39,6 +39,11 @@ public interface ICellularDeviceRuntimeRepository : IBaseRepository Task GetRuntimeByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default); + /// + /// 根据设备编号获取所有运行时状态 + /// + Task> GetRuntimesByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default); + /// /// 根据设备编号获取运行时状态(包含设备信息) /// diff --git a/src/X1.Domain/Repositories/Logging/IProtocolLogRepository.cs b/src/X1.Domain/Repositories/Logging/IProtocolLogRepository.cs new file mode 100644 index 0000000..f2e2d08 --- /dev/null +++ b/src/X1.Domain/Repositories/Logging/IProtocolLogRepository.cs @@ -0,0 +1,75 @@ +using CellularManagement.Domain.Entities.Logging; +using CellularManagement.Domain.Repositories.Base; +using X1.Domain.Entities.Logging; + +namespace CellularManagement.Domain.Repositories.Logging; + +/// +/// 协议日志仓储接口 +/// +public interface IProtocolLogRepository : IBaseRepository +{ + /// + /// 根据设备代码获取协议日志 + /// + /// 设备代码 + /// 取消令牌 + /// 协议日志列表 + Task> GetByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default); + + /// + /// 根据运行时代码获取协议日志 + /// + /// 运行时代码 + /// 取消令牌 + /// 协议日志列表 + Task> GetByRuntimeCodeAsync(string runtimeCode, CancellationToken cancellationToken = default); + + /// + /// 根据设备代码和运行时代码获取协议日志 + /// + /// 设备代码 + /// 运行时代码 + /// 取消令牌 + /// 协议日志列表 + Task> GetByDeviceAndRuntimeCodeAsync(string deviceCode, string runtimeCode, CancellationToken cancellationToken = default); + + /// + /// 根据时间范围获取协议日志 + /// + /// 开始时间戳 + /// 结束时间戳 + /// 取消令牌 + /// 协议日志列表 + Task> GetByTimeRangeAsync(long startTimestamp, long endTimestamp, CancellationToken cancellationToken = default); + + /// + /// 根据协议层类型获取协议日志 + /// + /// 协议层类型 + /// 取消令牌 + /// 协议日志列表 + Task> GetByLayerTypeAsync(ProtocolLayer layerType, CancellationToken cancellationToken = default); + + /// + /// 根据设备代码和运行时状态获取协议日志(高性能查询) + /// + /// 设备代码 + /// 运行时代码集合 + /// 开始时间戳 + /// 结束时间戳 + /// 协议层类型 + /// 是否按时间戳降序排序 + /// 取消令牌 + /// 协议日志列表 + Task> GetByDeviceWithFiltersAsync( + string deviceCode, + IEnumerable? runtimeCodes = null, + long? startTimestamp = null, + long? endTimestamp = null, + string? layerType = null, + bool orderByDescending = true, + CancellationToken cancellationToken = default); + + +} \ No newline at end of file diff --git a/src/X1.Domain/Transmission/IProtocolChannelManager.cs b/src/X1.Domain/Transmission/IProtocolChannelManager.cs new file mode 100644 index 0000000..c30844e --- /dev/null +++ b/src/X1.Domain/Transmission/IProtocolChannelManager.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using CellularManagement.Domain.Entities.Logging; + +namespace X1.Domain.Transmission +{ + /// + /// 协议通道管理器接口 + /// 提供协议日志的读取、写入和清空功能 + /// + public interface IProtocolChannelManager + { + /// + /// 从通道读取协议日志 + /// + /// 取消令牌 + /// 协议日志集合 + Task> ReadFromChannelAsync(CancellationToken cancellationToken = default); + + /// + /// 向通道写入协议日志 + /// + /// 协议日志集合 + /// 取消令牌 + /// 写入结果 + Task WriteToChannelAsync(IEnumerable protocolLogs, CancellationToken cancellationToken = default); + + /// + /// 清空协议通道 + /// + /// 清空结果 + Task ClearChannelAsync(); + } +} diff --git a/src/X1.Domain/Transmission/IProtocolLogObserver.cs b/src/X1.Domain/Transmission/IProtocolLogObserver.cs new file mode 100644 index 0000000..d061a68 --- /dev/null +++ b/src/X1.Domain/Transmission/IProtocolLogObserver.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using X1.Domain.Models; + +namespace X1.Domain.Transmission +{ + public interface IProtocolLogObserver + { + public Task OnProtocolLogsReceived(IEnumerable logDetails); + } +} diff --git a/src/X1.Domain/X1.Domain.csproj b/src/X1.Domain/X1.Domain.csproj index 8321811..05abf4f 100644 --- a/src/X1.Domain/X1.Domain.csproj +++ b/src/X1.Domain/X1.Domain.csproj @@ -12,6 +12,7 @@ + diff --git a/src/X1.Infrastructure/Configurations/Logging/ProtocolLogConfiguration.cs b/src/X1.Infrastructure/Configurations/Logging/ProtocolLogConfiguration.cs new file mode 100644 index 0000000..8da0bf1 --- /dev/null +++ b/src/X1.Infrastructure/Configurations/Logging/ProtocolLogConfiguration.cs @@ -0,0 +1,94 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using CellularManagement.Domain.Entities.Logging; + +namespace CellularManagement.Infrastructure.Configurations.Logging; + +/// +/// ProtocolLog 实体配置类 +/// 用于配置协议日志实体在数据库中的映射关系 +/// +public sealed class ProtocolLogConfiguration : IEntityTypeConfiguration +{ + /// + /// 配置 ProtocolLog 实体 + /// + /// 实体类型构建器 + public void Configure(EntityTypeBuilder builder) + { + // 配置表名 + builder.ToTable("tb_ProtocolLog", t => t.HasComment("协议日志表")); + + // 配置主键 + builder.HasKey(p => p.Id); + + // 配置索引 + builder.HasIndex(p => p.MessageId).HasDatabaseName("IX_ProtocolLog_MessageId"); + builder.HasIndex(p => p.DeviceCode).HasDatabaseName("IX_ProtocolLog_DeviceCode"); + builder.HasIndex(p => p.RuntimeCode).HasDatabaseName("IX_ProtocolLog_RuntimeCode"); + builder.HasIndex(p => p.Timestamp).HasDatabaseName("IX_ProtocolLog_Timestamp"); + builder.HasIndex(p => p.LayerType).HasDatabaseName("IX_ProtocolLog_LayerType"); + builder.HasIndex(p => new { p.DeviceCode, p.RuntimeCode }).HasDatabaseName("IX_ProtocolLog_DeviceCode_RuntimeCode"); + builder.HasIndex(p => new { p.DeviceCode, p.Timestamp }).HasDatabaseName("IX_ProtocolLog_DeviceCode_Timestamp"); + + // 配置属性 + builder.Property(p => p.Id) + .HasComment("主键ID"); + + builder.Property(p => p.MessageId) + .IsRequired() + .HasComment("消息ID"); + + builder.Property(p => p.LayerType) + .IsRequired() + .HasMaxLength(50) + .HasComment("协议层类型"); + + builder.Property(p => p.MessageDetailJson) + .HasComment("消息详情集合(JSON格式存储)"); + + builder.Property(p => p.CellID) + .HasComment("小区ID"); + + builder.Property(p => p.IMSI) + .HasMaxLength(50) + .HasComment("国际移动用户识别码"); + + builder.Property(p => p.Direction) + .IsRequired() + .HasComment("日志方向类型"); + + builder.Property(p => p.UEID) + .HasComment("用户设备ID"); + + builder.Property(p => p.PLMN) + .HasMaxLength(20) + .HasComment("公共陆地移动网络标识"); + + builder.Property(p => p.TimeMs) + .IsRequired() + .HasComment("时间间隔(毫秒)"); + + builder.Property(p => p.Timestamp) + .IsRequired() + .HasComment("时间戳"); + + builder.Property(p => p.Info) + .HasMaxLength(500) + .HasComment("信息字段"); + + builder.Property(p => p.Message) + .HasMaxLength(1000) + .HasComment("消息字段"); + + builder.Property(p => p.DeviceCode) + .IsRequired() + .HasMaxLength(50) + .HasComment("设备代码"); + + builder.Property(p => p.RuntimeCode) + .IsRequired() + .HasMaxLength(50) + .HasComment("运行时代码"); + } +} \ No newline at end of file diff --git a/src/X1.Infrastructure/Context/AppDbContext.cs b/src/X1.Infrastructure/Context/AppDbContext.cs index efdfb31..01dfcaa 100644 --- a/src/X1.Infrastructure/Context/AppDbContext.cs +++ b/src/X1.Infrastructure/Context/AppDbContext.cs @@ -37,6 +37,11 @@ public class AppDbContext : IdentityDbContext /// public DbSet LoginLogs { get; set; } = null!; + /// + /// 协议日志集合 + /// + public DbSet ProtocolLogs { get; set; } = null!; + /// /// 蜂窝设备集合 /// diff --git a/src/X1.Infrastructure/DependencyInjection.cs b/src/X1.Infrastructure/DependencyInjection.cs index a36df20..da8600d 100644 --- a/src/X1.Infrastructure/DependencyInjection.cs +++ b/src/X1.Infrastructure/DependencyInjection.cs @@ -25,9 +25,10 @@ using CellularManagement.Infrastructure.Services.Security; using CellularManagement.Infrastructure.Services.UserManagement; using CellularManagement.Domain.Repositories.Device; using CellularManagement.Domain.Repositories.NetworkProfile; +using CellularManagement.Domain.Repositories.Logging; using CellularManagement.Infrastructure.Repositories.Device; using CellularManagement.Infrastructure.Repositories.NetworkProfile; - +using CellularManagement.Infrastructure.Repositories.Logging; namespace CellularManagement.Infrastructure; /// @@ -172,6 +173,7 @@ public static class DependencyInjection services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); // 注册设备相关仓储 services.AddScoped(); diff --git a/src/X1.Infrastructure/Repositories/CQRS/QueryRepository.cs b/src/X1.Infrastructure/Repositories/CQRS/QueryRepository.cs index 067a655..1711d9d 100644 --- a/src/X1.Infrastructure/Repositories/CQRS/QueryRepository.cs +++ b/src/X1.Infrastructure/Repositories/CQRS/QueryRepository.cs @@ -74,6 +74,17 @@ public class QueryRepository : IQueryRepository where T : class return await query.ToListAsync(cancellationToken); } + /// + /// 根据条件查询实体并投影到指定类型 + /// + public async Task> FindAsync(Expression> predicate, Expression> select, Func, IQueryable>? include = null, CancellationToken cancellationToken = default) + { + IQueryable query = _dbSet.Where(predicate); + if (include != null) + query = include(query); + return await query.Select(select).ToListAsync(cancellationToken); + } + /// /// 分页查询实体 /// diff --git a/src/X1.Infrastructure/Repositories/Device/CellularDeviceRepository.cs b/src/X1.Infrastructure/Repositories/Device/CellularDeviceRepository.cs index 687c630..0c432c0 100644 --- a/src/X1.Infrastructure/Repositories/Device/CellularDeviceRepository.cs +++ b/src/X1.Infrastructure/Repositories/Device/CellularDeviceRepository.cs @@ -12,6 +12,7 @@ using CellularManagement.Infrastructure.Repositories.Base; using CellularManagement.Domain.Entities.Device; using CellularManagement.Domain.Repositories.Base; using CellularManagement.Domain.Repositories.Device; +using CellularManagement.Domain.Models; namespace CellularManagement.Infrastructure.Repositories.Device; @@ -207,4 +208,23 @@ public class CellularDeviceRepository : BaseRepository, ICellula { return await QueryRepository.CountAsync(d => true, cancellationToken: cancellationToken); } + + /// + /// 获取设备基本信息集合(DeviceCode、IpAddress、AgentPort) + /// + public async Task> GetDeviceBasicInfoListAsync(CancellationToken cancellationToken = default) + { + // 直接从数据库查询只需要的三个字段,只获取启用的设备 + var deviceBasicInfos = await QueryRepository.FindAsync( + d => d.IsEnabled, // 只获取启用的设备 + select: d => new DeviceBasicInfo + { + DeviceCode = d.DeviceCode, + IpAddress = d.IpAddress, + AgentPort = d.AgentPort + }, + cancellationToken: cancellationToken); + + return deviceBasicInfos.ToList(); + } } \ No newline at end of file diff --git a/src/X1.Infrastructure/Repositories/Device/CellularDeviceRuntimeRepository.cs b/src/X1.Infrastructure/Repositories/Device/CellularDeviceRuntimeRepository.cs index 1623fbc..85fcc23 100644 --- a/src/X1.Infrastructure/Repositories/Device/CellularDeviceRuntimeRepository.cs +++ b/src/X1.Infrastructure/Repositories/Device/CellularDeviceRuntimeRepository.cs @@ -86,6 +86,15 @@ public class CellularDeviceRuntimeRepository : BaseRepository r.CreatedAt).FirstOrDefault(); } + /// + /// 根据设备编号获取所有运行时状态 + /// + public async Task> GetRuntimesByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default) + { + var runtimes = await QueryRepository.FindAsync(r => r.DeviceCode == deviceCode, cancellationToken: cancellationToken); + return runtimes.OrderByDescending(r => r.CreatedAt).ToList(); + } + /// /// 根据设备编号获取运行时状态(包含设备信息) /// diff --git a/src/X1.Infrastructure/Repositories/Logging/ProtocolLogRepository.cs b/src/X1.Infrastructure/Repositories/Logging/ProtocolLogRepository.cs new file mode 100644 index 0000000..e5d8569 --- /dev/null +++ b/src/X1.Infrastructure/Repositories/Logging/ProtocolLogRepository.cs @@ -0,0 +1,226 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using CellularManagement.Domain.Entities.Logging; +using CellularManagement.Domain.Entities.Device; +using CellularManagement.Domain.Repositories.Logging; +using CellularManagement.Domain.Repositories.Base; +using CellularManagement.Infrastructure.Repositories.Base; +using System.Linq; +using X1.Domain.Entities.Logging; + +namespace CellularManagement.Infrastructure.Repositories.Logging; + +/// +/// 协议日志仓储实现类 +/// +public class ProtocolLogRepository : BaseRepository, IProtocolLogRepository +{ + private readonly ILogger _logger; + + /// + /// 构造函数 + /// + /// 命令仓储 + /// 查询仓储 + /// 日志记录器 + public ProtocolLogRepository( + ICommandRepository commandRepository, + IQueryRepository queryRepository, + ILogger logger) + : base(commandRepository, queryRepository, logger) + { + _logger = logger; + } + + /// + /// 根据设备代码获取协议日志 + /// + /// 设备代码 + /// 取消令牌 + /// 协议日志列表 + public async Task> GetByDeviceCodeAsync(string deviceCode, CancellationToken cancellationToken = default) + { + try + { + var logs = await QueryRepository.FindAsync( + p => p.DeviceCode == deviceCode, + cancellationToken: cancellationToken); + + return logs.OrderByDescending(p => p.Timestamp); + } + catch (Exception ex) + { + _logger.LogError(ex, "获取设备代码 {DeviceCode} 的协议日志时发生错误", deviceCode); + throw; + } + } + + /// + /// 根据运行时代码获取协议日志 + /// + /// 运行时代码 + /// 取消令牌 + /// 协议日志列表 + public async Task> GetByRuntimeCodeAsync(string runtimeCode, CancellationToken cancellationToken = default) + { + try + { + var logs = await QueryRepository.FindAsync( + p => p.RuntimeCode == runtimeCode, + cancellationToken: cancellationToken); + + return logs.OrderByDescending(p => p.Timestamp); + } + catch (Exception ex) + { + _logger.LogError(ex, "获取运行时代码 {RuntimeCode} 的协议日志时发生错误", runtimeCode); + throw; + } + } + + /// + /// 根据设备代码和运行时代码获取协议日志 + /// + /// 设备代码 + /// 运行时代码 + /// 取消令牌 + /// 协议日志列表 + public async Task> GetByDeviceAndRuntimeCodeAsync(string deviceCode, string runtimeCode, CancellationToken cancellationToken = default) + { + try + { + var logs = await QueryRepository.FindAsync( + p => p.DeviceCode == deviceCode && p.RuntimeCode == runtimeCode, + cancellationToken: cancellationToken); + + return logs.OrderByDescending(p => p.Timestamp); + } + catch (Exception ex) + { + _logger.LogError(ex, "获取设备代码 {DeviceCode} 和运行时代码 {RuntimeCode} 的协议日志时发生错误", deviceCode, runtimeCode); + throw; + } + } + + /// + /// 根据时间范围获取协议日志 + /// + /// 开始时间戳 + /// 结束时间戳 + /// 取消令牌 + /// 协议日志列表 + public async Task> GetByTimeRangeAsync(long startTimestamp, long endTimestamp, CancellationToken cancellationToken = default) + { + try + { + var logs = await QueryRepository.FindAsync( + p => p.Timestamp >= startTimestamp && p.Timestamp <= endTimestamp, + cancellationToken: cancellationToken); + + return logs.OrderByDescending(p => p.Timestamp); + } + catch (Exception ex) + { + _logger.LogError(ex, "获取时间范围 {StartTimestamp} 到 {EndTimestamp} 的协议日志时发生错误", startTimestamp, endTimestamp); + throw; + } + } + + /// + /// 根据协议层类型获取协议日志 + /// + /// 协议层类型 + /// 取消令牌 + /// 协议日志列表 + public async Task> GetByLayerTypeAsync(ProtocolLayer layerType, CancellationToken cancellationToken = default) + { + try + { + var logs = await QueryRepository.FindAsync( + p => p.LayerType == layerType, + cancellationToken: cancellationToken); + + return logs.OrderByDescending(p => p.Timestamp); + } + catch (Exception ex) + { + _logger.LogError(ex, "获取协议层类型 {LayerType} 的协议日志时发生错误", layerType); + throw; + } + } + + /// + /// 根据设备代码和运行时状态获取协议日志(高性能查询) + /// + /// 设备代码 + /// 运行时代码集合 + /// 开始时间戳 + /// 结束时间戳 + /// 协议层类型 + /// 是否按时间戳降序排序 + /// 取消令牌 + /// 协议日志列表 + public async Task> GetByDeviceWithFiltersAsync( + string deviceCode, + IEnumerable? runtimeCodes = null, + long? startTimestamp = null, + long? endTimestamp = null, + string? layerType = null, + bool orderByDescending = true, + CancellationToken cancellationToken = default) + { + try + { + // 构建 SQL 查询 + var sql = @" + SELECT pl.* + FROM ""ProtocolLogs"" pl + INNER JOIN ""CellularDeviceRuntimes"" cdr + ON pl.""DeviceCode"" = cdr.""DeviceCode"" + AND pl.""RuntimeCode"" = cdr.""RuntimeCode"" + WHERE pl.""DeviceCode"" = @deviceCode"; + + var parameters = new List { deviceCode }; + + // 添加运行时编码过滤 + if (runtimeCodes != null && runtimeCodes.Any()) + { + var runtimeCodeList = string.Join(",", runtimeCodes.Select((_, i) => $"@runtimeCode{i}")); + sql += $" AND pl.\"RuntimeCode\" IN ({runtimeCodeList})"; + parameters.AddRange(runtimeCodes); + } + + // 添加时间范围过滤 + if (startTimestamp.HasValue) + { + sql += " AND pl.\"Timestamp\" >= @startTimestamp"; + parameters.Add(startTimestamp.Value); + } + + if (endTimestamp.HasValue) + { + sql += " AND pl.\"Timestamp\" <= @endTimestamp"; + parameters.Add(endTimestamp.Value); + } + + // 添加协议层类型过滤 + if (!string.IsNullOrEmpty(layerType)) + { + sql += " AND pl.\"LayerType\" = @layerType"; + parameters.Add(layerType); + } + + // 添加排序 + sql += orderByDescending ? " ORDER BY pl.\"Timestamp\" DESC" : " ORDER BY pl.\"Timestamp\" ASC"; + + // 执行 SQL 查询 + var logs = await QueryRepository.ExecuteSqlQueryAsync(sql, parameters.ToArray(), cancellationToken); + return logs; + } + catch (Exception ex) + { + _logger.LogError(ex, "获取设备代码 {DeviceCode} 的协议日志时发生错误", deviceCode); + throw; + } + } +} \ No newline at end of file diff --git a/src/X1.Infrastructure/X1.Infrastructure.csproj b/src/X1.Infrastructure/X1.Infrastructure.csproj index 6c3fff1..671ad9c 100644 --- a/src/X1.Infrastructure/X1.Infrastructure.csproj +++ b/src/X1.Infrastructure/X1.Infrastructure.csproj @@ -7,12 +7,6 @@ enable - - - - - - diff --git a/src/X1.Presentation/Controllers/ProtocolLogsController.cs b/src/X1.Presentation/Controllers/ProtocolLogsController.cs new file mode 100644 index 0000000..3b61096 --- /dev/null +++ b/src/X1.Presentation/Controllers/ProtocolLogsController.cs @@ -0,0 +1,78 @@ +using Microsoft.AspNetCore.Mvc; +using Microsoft.AspNetCore.Authorization; +using Microsoft.Extensions.Logging; +using CellularManagement.Application.Features.ProtocolLogs.Queries.GetProtocolLogsByDevice; +using CellularManagement.Domain.Common; +using CellularManagement.Domain.Entities.Device; +using CellularManagement.Presentation.Abstractions; +using MediatR; + +namespace CellularManagement.Presentation.Controllers; + +/// +/// 协议日志控制器 +/// +[ApiController] +[Route("api/[controller]")] +[Authorize] +public class ProtocolLogsController : ApiController +{ + private readonly ILogger _logger; + + /// + /// 构造函数 + /// + /// 中介者 + /// 日志记录器 + public ProtocolLogsController(IMediator mediator, ILogger logger) + : base(mediator) + { + _logger = logger; + } + + + + /// + /// 根据设备代码获取协议日志 + /// + /// 设备代码 + /// 开始时间戳 + /// 结束时间戳 + /// 协议层类型 + /// 设备运行时状态 + /// 是否按时间戳降序排序 + /// 协议日志列表 + [HttpGet("device/{deviceCode}")] + public async Task> GetProtocolLogsByDevice( + string deviceCode, + [FromQuery] long? startTimestamp = null, + [FromQuery] long? endTimestamp = null, + [FromQuery] string? layerType = null, + [FromQuery] int? deviceRuntimeStatus = null, + [FromQuery] bool orderByDescending = true) + { + _logger.LogInformation("开始获取设备 {DeviceCode} 的协议日志,开始时间戳: {StartTimestamp}, 结束时间戳: {EndTimestamp}, 协议层类型: {LayerType}, 运行时状态: {DeviceRuntimeStatus}", + deviceCode, startTimestamp, endTimestamp, layerType, deviceRuntimeStatus); + + var query = new GetProtocolLogsByDeviceQuery + { + DeviceCode = deviceCode, + StartTimestamp = startTimestamp, + EndTimestamp = endTimestamp, + LayerType = layerType, + DeviceRuntimeStatus = deviceRuntimeStatus.HasValue ? (DeviceRuntimeStatus)deviceRuntimeStatus.Value : null, + OrderByDescending = orderByDescending + }; + + var result = await mediator.Send(query); + if (!result.IsSuccess) + { + _logger.LogWarning("获取设备 {DeviceCode} 的协议日志失败: {Message}", deviceCode, result.ErrorMessages); + return result; + } + + _logger.LogInformation("成功获取设备 {DeviceCode} 的协议日志,共 {Count} 条记录", + deviceCode, result.Data?.Items?.Count ?? 0); + return result; + } +} \ No newline at end of file diff --git a/src/X1.WebSocket/Extensions/ServiceCollectionExtensions.cs b/src/X1.WebSocket/Extensions/ServiceCollectionExtensions.cs index 87f3a94..e7d5514 100644 --- a/src/X1.WebSocket/Extensions/ServiceCollectionExtensions.cs +++ b/src/X1.WebSocket/Extensions/ServiceCollectionExtensions.cs @@ -4,6 +4,8 @@ using CellularManagement.WebSocket.Middleware; using CellularManagement.WebSocket.Services; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; +using X1.Domain.Transmission; +using X1.WebSocket.Transmission; namespace CellularManagement.WebSocket.Extensions; @@ -26,11 +28,16 @@ public static class ServiceCollectionExtensions // 注册连接管理协调器 services.AddSingleton(); + // 注册协议日志观察者(单例,因为需要在整个应用生命周期中保持状态) + // 必须在 ProtocolMessageHandler 之前注册,因为 ProtocolMessageHandler 依赖 IProtocolLogObserver + services.AddSingleton(); + // 注册 WebSocket 消息处理器 services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + // 注册后台服务 services.AddHostedService(); services.AddHostedService(); diff --git a/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs b/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs index 5b53c0d..e6560cb 100644 --- a/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs +++ b/src/X1.WebSocket/Handlers/ProtocolMessageHandler.cs @@ -6,6 +6,8 @@ using System.Text.Json; using System.Threading.Tasks; using CellularManagement.WebSocket.Models; using Microsoft.Extensions.Logging; +using X1.Domain.Models; +using X1.Domain.Transmission; using X1.WebSocket.Models; namespace CellularManagement.WebSocket.Handlers @@ -14,10 +16,11 @@ namespace CellularManagement.WebSocket.Handlers { private readonly ILogger _logger; private readonly JsonSerializerOptions _jsonOptions; - - public ProtocolMessageHandler(ILogger logger) + private readonly IProtocolLogObserver _logObserver; + public ProtocolMessageHandler(ILogger logger, IProtocolLogObserver logObserver) { _logger = logger; + _logObserver = logObserver; _jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true, @@ -98,31 +101,40 @@ namespace CellularManagement.WebSocket.Handlers /// 协议日志数组 private void ProcessProtocolLogs(MessageTransferProtocolLog[] protocolLogs) { - var processedCount = 0; - var errorCount = 0; - foreach (var log in protocolLogs) + try { - try + _logObserver.OnProtocolLogsReceived(protocolLogs); + var processedCount = 0; + var errorCount = 0; + + foreach (var log in protocolLogs) { - ProcessSingleProtocolLog(log); - processedCount++; + try + { + ProcessSingleProtocolLog(log); + processedCount++; + } + catch (Exception ex) + { + errorCount++; + _logger.LogError(ex, "处理单个协议日志时发生错误,日志ID:{LogId}", log.Id); + } } - catch (Exception ex) + + if (errorCount > 0) { - errorCount++; - _logger.LogError(ex, "处理单个协议日志时发生错误,日志ID:{LogId}", log.Id); + _logger.LogWarning("协议日志处理完成,成功:{SuccessCount},失败:{ErrorCount}", + processedCount, errorCount); + } + else + { + _logger.LogDebug("协议日志处理完成,成功处理 {ProcessedCount} 条日志", processedCount); } } - - if (errorCount > 0) - { - _logger.LogWarning("协议日志处理完成,成功:{SuccessCount},失败:{ErrorCount}", - processedCount, errorCount); - } - else + catch (Exception ex) { - _logger.LogDebug("协议日志处理完成,成功处理 {ProcessedCount} 条日志", processedCount); + _logger.LogDebug("转发日志数据异常{ex}", ex); } } diff --git a/src/X1.WebSocket/Models/ProtocolMessage.cs b/src/X1.WebSocket/Models/ProtocolMessage.cs index a0acfc3..cd765da 100644 --- a/src/X1.WebSocket/Models/ProtocolMessage.cs +++ b/src/X1.WebSocket/Models/ProtocolMessage.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Text; using System.Text.Json.Serialization; using System.Threading.Tasks; +using X1.Domain.Models; using X1.WebSocket.Models; namespace CellularManagement.WebSocket.Models diff --git a/src/X1.WebSocket/Transmission/NetworkProtocolLogObserver.cs b/src/X1.WebSocket/Transmission/NetworkProtocolLogObserver.cs new file mode 100644 index 0000000..f96bb3a --- /dev/null +++ b/src/X1.WebSocket/Transmission/NetworkProtocolLogObserver.cs @@ -0,0 +1,152 @@ +using CellularManagement.Domain.Entities.Logging; +using Microsoft.Extensions.Logging; +using X1.Domain.Models; +using X1.Domain.Transmission; + +namespace X1.WebSocket.Transmission +{ + /// + /// 网络协议日志观察者 + /// 负责接收协议日志并将其写入通道 + /// + public class NetworkProtocolLogObserver : IProtocolLogObserver + { + private readonly ILogger _logger; + private readonly IProtocolChannelManager _channelManager; + + public NetworkProtocolLogObserver(ILogger logger, IProtocolChannelManager channelManager) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _channelManager = channelManager ?? throw new ArgumentNullException(nameof(channelManager)); + } + + /// + /// 处理接收到的协议日志 + /// + /// 协议日志集合 + /// 处理任务 + public async Task OnProtocolLogsReceived(IEnumerable logDetails) + { + // 参数验证 + if (logDetails == null) + { + _logger.LogWarning("接收到空的协议日志集合"); + return; + } + + var logList = logDetails.ToList(); + if (!logList.Any()) + { + _logger.LogDebug("接收到空的协议日志集合"); + return; + } + + try + { + _logger.LogInformation("开始处理协议日志,数量:{Count}", logList.Count); + + // 过滤和验证日志数据 + var validLogs = FilterValidLogs(logList); + + if (!validLogs.Any()) + { + _logger.LogWarning("没有有效的协议日志需要处理"); + return; + } + + // 转换为ProtocolLog + var protocolLogs = ConvertToProtocolLogs(validLogs); + + // 写入通道 + var writeResult = await _channelManager.WriteToChannelAsync(protocolLogs, CancellationToken.None); + + if (writeResult) + { + _logger.LogDebug("成功写入协议日志到通道,数量:{Count}", protocolLogs.Count()); + } + else + { + _logger.LogWarning("写入协议日志到通道失败,数量:{Count}", protocolLogs.Count()); + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("协议日志处理被取消"); + } + catch (Exception ex) + { + _logger.LogError(ex, "处理协议日志时发生未预期的错误,日志数量:{Count}", logList.Count); + // 不重新抛出异常,避免影响其他操作 + } + } + + /// + /// 过滤有效的日志数据 + /// + /// 原始日志集合 + /// 有效的日志集合 + private IEnumerable FilterValidLogs(IEnumerable logs) + { + return logs.Where(log => + { + // 检查必需字段 + if (string.IsNullOrWhiteSpace(log.RuntimeCode)) + { + _logger.LogDebug("跳过无效日志:运行时代码为空,ID:{Id}", log.Id); + return false; + } + + if (string.IsNullOrWhiteSpace(log.DeviceCode)) + { + _logger.LogDebug("跳过无效日志:设备代码为空,ID:{Id}", log.Id); + return false; + } + + // 检查时间戳 + if (log.Timestamp <= 0) + { + _logger.LogDebug("跳过无效日志:时间戳无效,ID:{Id}", log.Id); + return false; + } + + return true; + }); + } + + /// + /// 转换为ProtocolLog实体 + /// + /// MessageTransferProtocolLog集合 + /// ProtocolLog集合 + private IEnumerable ConvertToProtocolLogs(IEnumerable logs) + { + return logs.Select(log => + { + try + { + return ProtocolLog.Create( + messageId: log.Id, + layerType: log.LayerType, + direction: log.Direction, + timeMs: log.TimeMs, + timestamp: log.Timestamp, + deviceCode: log.DeviceCode ?? string.Empty, + runtimeCode: log.RuntimeCode, + messageDetailJson: log.MessageDetailJson, + cellID: log.CellID, + imsi: log.IMSI, + ueid: log.UEID, + plmn: log.PLMN, + info: log.Info, + message: log.Message + ); + } + catch (Exception ex) + { + _logger.LogError(ex, "转换协议日志失败,ID:{Id}", log.Id); + return null; + } + }).Where(log => log != null)!; + } + } +} diff --git a/src/X1.WebSocket/X1.WebSocket.csproj b/src/X1.WebSocket/X1.WebSocket.csproj index 0c6638e..acc4f45 100644 --- a/src/X1.WebSocket/X1.WebSocket.csproj +++ b/src/X1.WebSocket/X1.WebSocket.csproj @@ -25,4 +25,8 @@ + + + +