@ -6,6 +6,8 @@ using X1.Domain.Repositories.Base;
using X1.Application.Features.TaskExecution.Events.NodeExecutionEvents ;
using X1.Domain.Entities.TestTask ;
using X1.Domain.Models ;
using X1.Domain.ServiceScope ;
using Microsoft.Extensions.DependencyInjection ;
namespace X1.Application.Features.TaskExecution.Commands.StartTaskExecution ;
@ -16,22 +18,19 @@ public class StartTaskExecutionCommandHandler : IRequestHandler<StartTaskExecuti
{
private readonly ILogger < StartTaskExecutionCommandHandler > _l ogger ;
private readonly ITaskExecutionService _ taskExecutionService ;
private readonly IUnitOfWork _ unitOfWork ;
private readonly IMediator _ mediator ;
private readonly ICurrentUserService _ currentUserService ;
protected readonly IServiceScopeExecutor _ scopeExecutor ;
public StartTaskExecutionCommandHandler (
ILogger < StartTaskExecutionCommandHandler > logger ,
ITaskExecutionService taskExecutionService ,
IUnitOfWork unitOfWork ,
IMediator mediator ,
ICurrentUserService currentUserService )
ICurrentUserService currentUserService ,
IServiceScopeExecutor scopeExecutor )
{
_l ogger = logger ;
_ taskExecutionService = taskExecutionService ;
_ unitOfWork = unitOfWork ;
_ mediator = mediator ;
_ currentUserService = currentUserService ;
_ scopeExecutor = scopeExecutor ;
}
/// <summary>
@ -59,59 +58,25 @@ public class StartTaskExecutionCommandHandler : IRequestHandler<StartTaskExecuti
return OperationResult < StartTaskExecutionResponse > . CreateFailure ( "任务执行请求列表不能为空" ) ;
}
_l ogger . LogInformation ( "开始处理启动任务执行命令,任务数量: {TaskCount}, 执行人ID: {ExecutorId}" ,
_l ogger . LogInformation ( "开始处理启动任务执行命令,任务数量: {TaskCount}, 执行人ID: {ExecutorId}" ,
request . TaskExecutionRequests . Count , currentUserId ) ;
var response = new StartTaskExecutionResponse ( ) ;
var successCount = 0 ;
var failureCount = 0 ;
var errors = new List < string > ( ) ;
// 批量处理任务
foreach ( var taskRequest in request . TaskExecutionRequests )
{
try
{
// 调用任务执行服务启动任务
var taskExecution = await _ taskExecutionService . StartTaskExecutionAsync (
taskRequest . TaskId ,
currentUserId ,
cancellationToken ) ;
_l ogger . LogInformation ( "任务执行启动成功,执行ID: {TaskExecutionId}, 任务ID: {TaskId}" ,
taskExecution . Id , taskRequest . TaskId ) ;
// 第一阶段:统一收集所有任务数据
var ( taskExecutionDataList , successCount , failureCount , errors ) = await CollectTaskExecutionDataAsync (
request . TaskExecutionRequests , currentUserId , cancellationToken ) ;
// 获取初始节点信息
var initialNodeInfo = await _ taskExecutionService . GetInitialNodeAsync ( taskRequest . TaskId , cancellationToken ) ;
// 获取终端设备信息
var terminalDevices = await _ taskExecutionService . GetTerminalDevicesByTaskIdAsync ( taskRequest . TaskId , cancellationToken ) ;
// 发布 NodeExecutionStartedEvent 来启动流程
_ = PublishNodeExecutionStartedEventsAsync ( initialNodeInfo , taskExecution , currentUserId , taskRequest . DeviceCode , terminalDevices , cancellationToken ) ;
successCount + + ;
}
catch ( InvalidOperationException ex )
{
_l ogger . LogWarning ( ex , "启动任务执行失败,任务ID: {TaskId}" , taskRequest . TaskId ) ;
errors . Add ( $"任务 {taskRequest.TaskId}: {ex.Message}" ) ;
failureCount + + ;
}
catch ( Exception ex )
{
_l ogger . LogError ( ex , "启动任务执行时发生未预期的错误,任务ID: {TaskId}" , taskRequest . TaskId ) ;
errors . Add ( $"任务 {taskRequest.TaskId}: 启动失败,请稍后重试" ) ;
failureCount + + ;
}
// 第二阶段:统一发布事件
if ( taskExecutionDataList . Any ( ) )
{
_ = PublishAllNodeExecutionStartedEventsAsync ( taskExecutionDataList , currentUserId , cancellationToken ) ;
}
response . SuccessCount = successCount ;
response . FailureCount = failureCount ;
response . Errors = errors ;
await _ unitOfWork . SaveChangesAsync ( ) ;
// 根据结果返回响应
if ( successCount > 0 & & failureCount = = 0 )
{
@ -122,7 +87,7 @@ public class StartTaskExecutionCommandHandler : IRequestHandler<StartTaskExecuti
{
_l ogger . LogWarning ( "部分任务执行启动成功,成功数量: {SuccessCount}, 失败数量: {FailureCount}" , successCount , failureCount ) ;
return OperationResult < StartTaskExecutionResponse > . CreateSuccess (
$"部分任务启动成功,成功 {successCount} 个,失败 {failureCount} 个。失败详情: {string.Join(" ; ", errors)}" ,
$"部分任务启动成功,成功 {successCount} 个,失败 {failureCount} 个。失败详情: {string.Join(" ; ", errors)}" ,
response ) ;
}
else
@ -140,7 +105,112 @@ public class StartTaskExecutionCommandHandler : IRequestHandler<StartTaskExecuti
}
/// <summary>
/// 发布节点执行启动事件
/// 准备任务执行数据
/// 创建任务执行记录,获取初始节点信息和终端设备信息
/// </summary>
/// <param name="taskExecutionRequests">任务执行请求列表</param>
/// <param name="currentUserId">当前用户ID</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>任务执行数据准备结果</returns>
private async Task < ( List < TaskExecutionData > TaskExecutionDataList , int SuccessCount , int FailureCount , List < string > Errors ) > CollectTaskExecutionDataAsync (
List < TaskExecutionRequest > taskExecutionRequests ,
string currentUserId ,
CancellationToken cancellationToken )
{
var taskExecutionDataList = new List < TaskExecutionData > ( ) ;
var successCount = 0 ;
var failureCount = 0 ;
var errors = new List < string > ( ) ;
_l ogger . LogInformation ( "开始准备任务执行数据,任务数量: {TaskCount}" , taskExecutionRequests . Count ) ;
foreach ( var taskRequest in taskExecutionRequests )
{
try
{
// 调用任务执行服务启动任务
var taskExecution = await _ taskExecutionService . StartTaskExecutionAsync (
taskRequest . TaskId ,
currentUserId ,
cancellationToken ) ;
_l ogger . LogInformation ( "任务执行记录创建成功,执行ID: {TaskExecutionId}, 任务ID: {TaskId}" ,
taskExecution . Id , taskRequest . TaskId ) ;
// 获取初始节点信息
var initialNodeInfo = await _ taskExecutionService . GetInitialNodeAsync ( taskRequest . TaskId , cancellationToken ) ;
// 获取终端设备信息
var terminalDevices = await _ taskExecutionService . GetTerminalDevicesByTaskIdAsync ( taskRequest . TaskId , cancellationToken ) ;
// 收集数据
taskExecutionDataList . Add ( new TaskExecutionData
{
TaskRequest = taskRequest ,
TaskExecution = taskExecution ,
InitialNodeInfo = initialNodeInfo ,
TerminalDevices = terminalDevices
} ) ;
successCount + + ;
}
catch ( InvalidOperationException ex )
{
_l ogger . LogWarning ( ex , "准备任务执行数据失败,任务ID: {TaskId}" , taskRequest . TaskId ) ;
errors . Add ( $"任务 {taskRequest.TaskId}: {ex.Message}" ) ;
failureCount + + ;
}
catch ( Exception ex )
{
_l ogger . LogError ( ex , "准备任务执行数据时发生未预期的错误,任务ID: {TaskId}" , taskRequest . TaskId ) ;
errors . Add ( $"任务 {taskRequest.TaskId}: 准备失败,请稍后重试" ) ;
failureCount + + ;
}
}
_l ogger . LogInformation ( "任务执行数据准备完成,成功数量: {SuccessCount}, 失败数量: {FailureCount}" , successCount , failureCount ) ;
return ( taskExecutionDataList , successCount , failureCount , errors ) ;
}
/// <summary>
/// 统一发布所有任务的节点执行启动事件
/// </summary>
/// <param name="taskExecutionDataList">任务执行数据列表</param>
/// <param name="executorId">执行人ID</param>
/// <param name="cancellationToken">取消令牌</param>
private async Task PublishAllNodeExecutionStartedEventsAsync (
List < TaskExecutionData > taskExecutionDataList ,
string executorId ,
CancellationToken cancellationToken )
{
_l ogger . LogInformation ( "开始统一初始化并启动任务执行,任务数量: {TaskCount}" , taskExecutionDataList . Count ) ;
foreach ( var taskData in taskExecutionDataList )
{
try
{
await InitializeAndStartTaskExecutionAsync (
taskData . InitialNodeInfo ,
taskData . TaskExecution ,
executorId ,
taskData . TaskRequest . DeviceCode ,
taskData . TerminalDevices ,
cancellationToken ) ;
}
catch ( Exception ex )
{
_l ogger . LogError ( ex , "初始化并启动任务执行失败,任务ID: {TaskId}" , taskData . TaskRequest . TaskId ) ;
// 继续处理其他任务,不中断整个流程
}
}
_l ogger . LogInformation ( "完成统一初始化并启动任务执行,任务数量: {TaskCount}" , taskExecutionDataList . Count ) ;
}
/// <summary>
/// 初始化并启动任务执行
/// 包括状态更新、批量创建用例明细和发布节点事件
/// </summary>
/// <param name="initialNodeInfo">初始节点信息</param>
/// <param name="executionDetail">任务执行详情</param>
@ -148,7 +218,7 @@ public class StartTaskExecutionCommandHandler : IRequestHandler<StartTaskExecuti
/// <param name="deviceCode">设备编码</param>
/// <param name="terminalDevices">终端设备列表</param>
/// <param name="cancellationToken">取消令牌</param>
private async Task PublishNodeExecutionStartedEvents Async(
private async Task InitializeAndStartTaskExecution Async(
InitialNodeInfo ? initialNodeInfo ,
TestScenarioTaskExecutionDetail executionDetail ,
string executorId ,
@ -158,39 +228,152 @@ public class StartTaskExecutionCommandHandler : IRequestHandler<StartTaskExecuti
{
if ( initialNodeInfo ! = null & & initialNodeInfo . InitialNodes . Any ( ) )
{
// 为每个初始节点发布事件
foreach ( var initialNode in initialNodeInfo . InitialNodes )
// 1. 更新任务执行状态为运行中
var resultUpdateStatus = await _ scopeExecutor . ExecuteAsync ( async serviceProvider = >
{
var nodeExecutionStartedEvent = new NodeExecutionStartedEvent
{
EventId = Guid . NewGuid ( ) . ToString ( ) ,
TaskExecutionId = executionDetail . Id ,
NodeId = initialNode . NodeId ,
StepMapping = initialNode . StepMapping ,
ExecutorId = executorId ,
RuntimeCode = executionDetail . RuntimeCode ? ? string . Empty , // 在 StartFlowControllerHandler 中生成
ScenarioCode = initialNodeInfo . ScenarioCode ,
ScenarioId = initialNodeInfo . ScenarioId ,
FlowName = initialNode . FlowName ,
FlowId = initialNode . FlowId ,
DeviceCode = deviceCode ,
TerminalDevices = terminalDevices . ToList ( ) ,
Timestamp = DateTime . UtcNow
} ;
// 异步发布事件,不等待执行完成(fire and forget)
await _ mediator . Publish ( nodeExecutionStartedEvent , cancellationToken ) ;
_l ogger . LogInformation ( "已发布 NodeExecutionStartedEvent,事件ID: {EventId}, 节点ID: {NodeId}, 流程: {FlowName}" ,
nodeExecutionStartedEvent . EventId , nodeExecutionStartedEvent . NodeId , initialNode . FlowName ) ;
var scopedTaskExecution = serviceProvider . GetRequiredService < ITaskExecutionService > ( ) ;
await scopedTaskExecution . UpdateTaskExecutionStatusAsync ( executionDetail . Id , TaskExecutionStatus . Running ) ;
} , cancellationToken ) ;
if ( ! resultUpdateStatus . IsSuccess )
{
_l ogger . LogError ( "更新任务执行服务运行状态失败: {ErrorMessage}" , resultUpdateStatus . ErrorMessage ) ;
return ;
}
_l ogger . LogInformation ( "已为 {NodeCount} 个初始节点发布事件,任务ID: {TaskId}" ,
// 2. 批量创建任务执行用例明细初始化数据
var batchCreateResult = await CreateTaskExecutionCaseDetailsAsync (
initialNodeInfo , executionDetail , executorId , cancellationToken ) ;
if ( ! batchCreateResult )
{
_l ogger . LogError ( "批量创建任务执行用例明细失败,任务ID: {TaskId}" , initialNodeInfo . TaskId ) ;
return ;
}
// 3. 为每个初始节点发布事件
await PublishNodeExecutionEventsAsync (
initialNodeInfo , executionDetail , executorId , deviceCode , terminalDevices , cancellationToken ) ;
_l ogger . LogInformation ( "已为 {NodeCount} 个初始节点发布事件,任务ID: {TaskId}" ,
initialNodeInfo . InitialNodes . Count , initialNodeInfo . TaskId ) ;
// 1. 更新任务执行状态为运行中
var resultUpdateSuccessStatus = await _ scopeExecutor . ExecuteAsync ( async serviceProvider = >
{
var scopedTaskExecution = serviceProvider . GetRequiredService < ITaskExecutionService > ( ) ;
await scopedTaskExecution . UpdateTaskExecutionStatusAsync ( executionDetail . Id , TaskExecutionStatus . Success ) ;
} , cancellationToken ) ;
if ( ! resultUpdateStatus . IsSuccess )
{
_l ogger . LogError ( "更新任务执行服务成功状态失败: {ErrorMessage}" , resultUpdateStatus . ErrorMessage ) ;
return ;
}
}
else
{
_l ogger . LogWarning ( "无法获取初始节点,任务ID: {TaskId}" , initialNodeInfo ? . TaskId ? ? "未知" ) ;
}
}
/// <summary>
/// 创建任务执行用例明细初始化数据
/// </summary>
/// <param name="initialNodeInfo">初始节点信息</param>
/// <param name="executionDetail">任务执行详情</param>
/// <param name="executorId">执行人ID</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>是否创建成功</returns>
private async Task < bool > CreateTaskExecutionCaseDetailsAsync (
InitialNodeInfo initialNodeInfo ,
TestScenarioTaskExecutionDetail executionDetail ,
string executorId ,
CancellationToken cancellationToken )
{
try
{
var testCaseFlowIds = initialNodeInfo . InitialNodes . Select ( node = > node . FlowId ) . ToList ( ) ;
var resultBatchCreate = await _ scopeExecutor . ExecuteAsync ( async serviceProvider = >
{
var scopedTaskExecutionService = serviceProvider . GetRequiredService < ITaskExecutionService > ( ) ;
await scopedTaskExecutionService . BatchCreateTaskExecutionCaseDetailsAsync (
executionDetail . Id ,
initialNodeInfo . ScenarioCode ,
testCaseFlowIds ,
executorId ,
loop : 1 ,
cancellationToken ) ;
} , cancellationToken ) ;
if ( ! resultBatchCreate . IsSuccess )
{
_l ogger . LogError ( "批量创建任务执行用例明细失败: {ErrorMessage}" , resultBatchCreate . ErrorMessage ) ;
return false ;
}
_l ogger . LogInformation ( "批量创建任务执行用例明细成功,执行ID: {TaskExecutionId}, 用例数量: {CaseCount}" ,
executionDetail . Id , testCaseFlowIds . Count ) ;
return true ;
}
catch ( Exception ex )
{
_l ogger . LogError ( ex , "批量创建任务执行用例明细时发生异常,执行ID: {TaskExecutionId}" , executionDetail . Id ) ;
return false ;
}
}
/// <summary>
/// 发布节点执行事件
/// 为每个初始节点创建并发布执行事件
/// </summary>
/// <param name="initialNodeInfo">初始节点信息</param>
/// <param name="executionDetail">任务执行详情</param>
/// <param name="executorId">执行人ID</param>
/// <param name="deviceCode">设备编码</param>
/// <param name="terminalDevices">终端设备列表</param>
/// <param name="cancellationToken">取消令牌</param>
private async Task PublishNodeExecutionEventsAsync (
InitialNodeInfo initialNodeInfo ,
TestScenarioTaskExecutionDetail executionDetail ,
string executorId ,
string deviceCode ,
IReadOnlyList < TerminalDeviceDto > terminalDevices ,
CancellationToken cancellationToken )
{
foreach ( var initialNode in initialNodeInfo . InitialNodes )
{
var nodeExecutionStartedEvent = new NodeExecutionStartedEvent
{
EventId = Guid . NewGuid ( ) . ToString ( ) ,
TaskExecutionId = executionDetail . Id ,
NextNodeInfo = new NextNodeInfo { NodeId = initialNode . NodeId , StepMapping = initialNode . StepMapping , StepId = initialNode . StepId , NodeName = initialNode . NodeName } ,
ExecutorId = executorId ,
RuntimeCode = executionDetail . RuntimeCode ? ? string . Empty , // 在 StartFlowControllerHandler 中生成
ScenarioCode = initialNodeInfo . ScenarioCode ,
ScenarioId = initialNodeInfo . ScenarioId ,
FlowName = initialNode . FlowName ,
FlowId = initialNode . FlowId ,
DeviceCode = deviceCode ,
TerminalDevices = terminalDevices . ToList ( ) ,
Timestamp = DateTime . UtcNow
} ;
var resultPublish = await _ scopeExecutor . ExecuteAsync ( async serviceProvider = >
{
var scopedMediator = serviceProvider . GetRequiredService < IMediator > ( ) ;
await scopedMediator . Publish ( nodeExecutionStartedEvent , cancellationToken ) ;
} , cancellationToken ) ;
_l ogger . LogInformation ( "已发布 NodeExecutionStartedEvent,事件ID: {EventId}, 节点ID: {NodeId}, 流程: {FlowName}" ,
nodeExecutionStartedEvent . EventId , nodeExecutionStartedEvent . NextNodeInfo . NodeId , initialNode . FlowName ) ;
if ( ! resultPublish . IsSuccess )
{
_l ogger . LogError ( "发布事件失败: {ErrorMessage}" , resultPublish . ErrorMessage ) ;
}
}
}
}