You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

245 lines
9.1 KiB

"""
设备命令执行器模块
"""
import asyncio
import time
from typing import Dict, List, Optional, Any
from app.core.device.manager import device_manager
from app.core.device.dispatcher import device_dispatcher
from app.models.adb_models import (
ShellCommandTask, DeviceShellTask, UnifiedShellCommandRequest,
ShellCommandResult, DeviceShellResult, ShellResponse
)
from app.utils.structured_log import get_structured_logger
logger = get_structured_logger(__name__)
class CommandExecutor:
"""设备命令执行器
负责执行设备上的Shell命令,支持:
- 单设备命令执行
- 多设备批量执行
- 统一命令格式处理
"""
def __init__(self):
self.device_manager = device_manager
self.device_dispatcher = device_dispatcher
async def execute_unified_shell_commands(self, request: UnifiedShellCommandRequest) -> Dict[str, Any]:
"""执行统一的Shell命令请求
Args:
request: 统一的Shell命令请求
Returns:
执行结果字典
"""
try:
logger.info(f"开始执行统一Shell命令,设备数量: {len(request.tasks)}")
# 记录开始时间
start_time = time.time()
# 并行执行所有设备的命令
tasks = []
for device_task in request.tasks:
task = self._execute_device_shell_commands(device_task)
tasks.append((device_task.device_id, task))
# 等待所有任务完成
device_results = {}
failed_devices = []
success_count = 0
for device_id, task in tasks:
try:
result = await task
device_results[device_id] = result
if result.success:
success_count += 1
else:
failed_devices.append(device_id)
except Exception as e:
logger.error(f"设备 {device_id} 命令执行异常: {e}")
device_results[device_id] = DeviceShellResult(
device_id=device_id,
success=False,
error=str(e),
results=[]
)
failed_devices.append(device_id)
# 计算执行时间
execution_time = time.time() - start_time
# 构建响应
total_devices = len(request.tasks)
if success_count == total_devices:
message = f"所有设备命令执行成功 ({success_count}/{total_devices})"
elif success_count > 0:
message = f"部分设备命令执行成功 ({success_count}/{total_devices}),失败设备: {failed_devices}"
else:
message = f"所有设备命令执行失败 ({success_count}/{total_devices})"
return {
"success": success_count > 0,
"message": message,
"data": {
"total_devices": total_devices,
"success_count": success_count,
"failed_count": len(failed_devices),
"failed_devices": failed_devices,
"execution_time": execution_time,
"device_results": {
device_id: result.dict() if hasattr(result, 'dict') else result
for device_id, result in device_results.items()
}
}
}
except Exception as e:
logger.error(f"统一Shell命令执行失败: {e}")
raise
async def _execute_device_shell_commands(self, device_task: DeviceShellTask) -> DeviceShellResult:
"""执行单个设备的Shell命令
Args:
device_task: 设备Shell命令任务
Returns:
设备执行结果
"""
try:
device_id = device_task.device_id
logger.info(f"开始执行设备 {device_id} 的命令,命令数量: {len(device_task.commands)}")
# 获取设备信息
device = await self.device_manager.get_device(device_id)
if not device:
return DeviceShellResult(
device_id=device_id,
success=False,
total_commands=0,
success_commands=0,
failed_commands=0,
error=f"设备 {device_id} 不存在",
results=[]
)
# 执行每个命令
command_results = []
for cmd_task in device_task.commands:
result = await self._execute_single_command(device, cmd_task)
command_results.append(result)
# 统计命令执行结果
total_commands = len(command_results)
success_commands = sum(1 for result in command_results if result.success)
failed_commands = total_commands - success_commands
# 检查是否所有命令都成功
all_success = all(result.success for result in command_results)
return DeviceShellResult(
device_id=device_id,
success=all_success,
total_commands=total_commands,
success_commands=success_commands,
failed_commands=failed_commands,
error=None if all_success else "部分命令执行失败",
results=command_results
)
except Exception as e:
logger.error(f"设备 {device_task.device_id} 命令执行失败: {e}")
return DeviceShellResult(
device_id=device_task.device_id,
success=False,
total_commands=0,
success_commands=0,
failed_commands=0,
error=str(e),
results=[]
)
async def _execute_single_command(self, device: Any, cmd_task: ShellCommandTask) -> ShellCommandResult:
"""执行单个Shell命令
Args:
device: 设备对象
cmd_task: 命令任务
Returns:
命令执行结果
"""
try:
start_time = time.time()
# 根据设备来源选择执行方式
if device.source == "registered":
# 注册设备:通过设备分发器执行
result = await self.device_dispatcher.execute_operation(
device_id=device.device_id,
protocol_type=device.protocol_type,
operation="execute_command",
command=cmd_task.command,
timeout=cmd_task.timeout
)
else:
# 自动发现设备:直接调用ADB服务
from app.services.auto_discovery_adb_service import auto_discovery_adb_service
result = await auto_discovery_adb_service.execute_shell_command(
device.device_id,
cmd_task.command,
timeout=cmd_task.timeout
)
execution_time = time.time() - start_time
# 统一使用实体返回格式
if isinstance(result, ShellResponse):
# ShellResponse实体格式
return ShellCommandResult(
command=cmd_task.command,
success=result.success,
output=result.output,
error=result.error,
exit_code=result.exit_code,
execution_time=execution_time,
wait_time=cmd_task.wait_time,
timeout=cmd_task.timeout
)
else:
# 其他格式统一转换为失败结果
logger.warning(f"命令执行返回格式不符合规范: {type(result)}")
return ShellCommandResult(
command=cmd_task.command,
success=False,
output="",
error=f"返回格式不符合规范: {type(result)}",
exit_code=-1,
execution_time=execution_time,
wait_time=cmd_task.wait_time,
timeout=cmd_task.timeout
)
except Exception as e:
logger.error(f"命令执行失败: {e}")
return ShellCommandResult(
command=cmd_task.command,
success=False,
output="",
error=str(e),
exit_code=-1,
execution_time=time.time() - start_time if 'start_time' in locals() else 0,
wait_time=cmd_task.wait_time,
timeout=cmd_task.timeout
)
# 全局命令执行器实例
command_executor = CommandExecutor()