You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
6.0 KiB
155 lines
6.0 KiB
|
4 months ago
|
using MediatR;
|
||
|
|
using Microsoft.Extensions.Logging;
|
||
|
|
using X1.Domain.Events;
|
||
|
|
using X1.Domain.Entities.TestCase;
|
||
|
|
using X1.Application.Features.TaskExecution.Events.NodeExecutionEvents;
|
||
|
|
|
||
|
|
namespace X1.Application.Features.TaskExecution.Events.Interfaces;
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 节点执行处理器基础抽象类
|
||
|
|
/// 提供通用的处理逻辑和模板方法
|
||
|
|
/// </summary>
|
||
|
|
/// <typeparam name="T">事件类型</typeparam>
|
||
|
|
public abstract class INodeExecutionHandlerBase<T> : INodeExecutionHandler<T>
|
||
|
|
where T : INodeExecutionEvent
|
||
|
|
{
|
||
|
|
protected readonly IMediator _mediator;
|
||
|
|
protected readonly ILogger _logger;
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 初始化处理器
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="mediator">MediatR 中介者</param>
|
||
|
|
/// <param name="logger">日志记录器</param>
|
||
|
|
protected INodeExecutionHandlerBase(IMediator mediator, ILogger logger)
|
||
|
|
{
|
||
|
|
_mediator = mediator;
|
||
|
|
_logger = logger;
|
||
|
|
}
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// MediatR 事件处理方法
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="notification">事件通知</param>
|
||
|
|
/// <param name="cancellationToken">取消令牌</param>
|
||
|
|
/// <returns>处理任务</returns>
|
||
|
|
public async Task Handle(T notification, CancellationToken cancellationToken)
|
||
|
|
{
|
||
|
|
try
|
||
|
|
{
|
||
|
|
_logger.LogInformation("开始处理节点执行事件,任务执行ID: {TaskExecutionId}, 节点ID: {NodeId}, 步骤映射: {StepMapping}",
|
||
|
|
notification.TaskExecutionId, notification.NodeId, notification.StepMapping);
|
||
|
|
|
||
|
|
// 检查是否支持处理此事件
|
||
|
|
if (!CanHandle(notification.StepMapping))
|
||
|
|
{
|
||
|
|
_logger.LogWarning("处理器不支持此步骤映射类型: {StepMapping}", notification.StepMapping);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
// 调用具体的处理逻辑
|
||
|
|
await HandleAsync(notification, cancellationToken);
|
||
|
|
|
||
|
|
_logger.LogInformation("节点执行事件处理完成,任务执行ID: {TaskExecutionId}, 节点ID: {NodeId}",
|
||
|
|
notification.TaskExecutionId, notification.NodeId);
|
||
|
|
}
|
||
|
|
catch (Exception ex)
|
||
|
|
{
|
||
|
|
_logger.LogError(ex, "处理节点执行事件时发生错误,任务执行ID: {TaskExecutionId}, 节点ID: {NodeId}, 步骤映射: {StepMapping}",
|
||
|
|
notification.TaskExecutionId, notification.NodeId, notification.StepMapping);
|
||
|
|
|
||
|
|
// 发布失败事件
|
||
|
|
await _mediator.Publish(new NodeExecutionFailedEvent
|
||
|
|
{
|
||
|
|
TaskExecutionId = notification.TaskExecutionId,
|
||
|
|
NodeId = notification.NodeId,
|
||
|
|
StepMapping = notification.StepMapping,
|
||
|
|
ExecutorId = notification.ExecutorId,
|
||
|
|
RuntimeCode = notification.RuntimeCode,
|
||
|
|
ScenarioCode = notification.ScenarioCode,
|
||
|
|
ScenarioId = notification.ScenarioId,
|
||
|
|
FlowName = notification.FlowName,
|
||
|
|
FlowId = notification.FlowId,
|
||
|
|
ErrorMessage = ex.Message,
|
||
|
|
Timestamp = DateTime.UtcNow
|
||
|
|
}, cancellationToken);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 抽象方法:具体的处理逻辑
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="notification">事件通知</param>
|
||
|
|
/// <param name="cancellationToken">取消令牌</param>
|
||
|
|
/// <returns>处理任务</returns>
|
||
|
|
public abstract Task HandleAsync(T notification, CancellationToken cancellationToken);
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 抽象方法:检查是否支持处理指定的事件类型
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="stepMapping">步骤映射类型</param>
|
||
|
|
/// <returns>是否支持</returns>
|
||
|
|
public abstract bool CanHandle(StepMapping stepMapping);
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 抽象方法:获取处理器支持的步骤映射类型
|
||
|
|
/// </summary>
|
||
|
|
/// <returns>支持的步骤映射类型</returns>
|
||
|
|
public abstract StepMapping GetSupportedStepMapping();
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 发布完成事件
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="notification">原始事件</param>
|
||
|
|
/// <param name="result">执行结果</param>
|
||
|
|
/// <param name="cancellationToken">取消令牌</param>
|
||
|
|
/// <returns>发布任务</returns>
|
||
|
|
protected async Task PublishCompletedEventAsync(T notification, NodeExecutionResult result, CancellationToken cancellationToken)
|
||
|
|
{
|
||
|
|
var completedEvent = new NodeExecutionCompletedEvent
|
||
|
|
{
|
||
|
|
TaskExecutionId = notification.TaskExecutionId,
|
||
|
|
NodeId = notification.NodeId,
|
||
|
|
StepMapping = notification.StepMapping,
|
||
|
|
ExecutorId = notification.ExecutorId,
|
||
|
|
RuntimeCode = notification.RuntimeCode,
|
||
|
|
ScenarioCode = notification.ScenarioCode,
|
||
|
|
ScenarioId = notification.ScenarioId,
|
||
|
|
FlowName = notification.FlowName,
|
||
|
|
FlowId = notification.FlowId,
|
||
|
|
Result = result,
|
||
|
|
Timestamp = DateTime.UtcNow
|
||
|
|
};
|
||
|
|
|
||
|
|
await _mediator.Publish(completedEvent, cancellationToken);
|
||
|
|
}
|
||
|
|
|
||
|
|
/// <summary>
|
||
|
|
/// 发布失败事件
|
||
|
|
/// </summary>
|
||
|
|
/// <param name="notification">原始事件</param>
|
||
|
|
/// <param name="errorMessage">错误消息</param>
|
||
|
|
/// <param name="cancellationToken">取消令牌</param>
|
||
|
|
/// <returns>发布任务</returns>
|
||
|
|
protected async Task PublishFailedEventAsync(T notification, string errorMessage, CancellationToken cancellationToken)
|
||
|
|
{
|
||
|
|
var failedEvent = new NodeExecutionFailedEvent
|
||
|
|
{
|
||
|
|
TaskExecutionId = notification.TaskExecutionId,
|
||
|
|
NodeId = notification.NodeId,
|
||
|
|
StepMapping = notification.StepMapping,
|
||
|
|
ExecutorId = notification.ExecutorId,
|
||
|
|
RuntimeCode = notification.RuntimeCode,
|
||
|
|
ScenarioCode = notification.ScenarioCode,
|
||
|
|
ScenarioId = notification.ScenarioId,
|
||
|
|
FlowName = notification.FlowName,
|
||
|
|
FlowId = notification.FlowId,
|
||
|
|
ErrorMessage = errorMessage,
|
||
|
|
Timestamp = DateTime.UtcNow
|
||
|
|
};
|
||
|
|
|
||
|
|
await _mediator.Publish(failedEvent, cancellationToken);
|
||
|
|
}
|
||
|
|
}
|