You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
256 lines
10 KiB
256 lines
10 KiB
"""
|
|
设备命令执行器模块
|
|
"""
|
|
import asyncio
|
|
import time
|
|
from typing import Dict, List, Optional, Any
|
|
from app.core.device.manager import device_manager
|
|
from app.core.device.dispatcher import device_dispatcher
|
|
from app.models.adb_models import (
|
|
ShellCommandTask, DeviceShellTask, UnifiedShellCommandRequest,
|
|
ShellCommandResult, DeviceShellResult, ShellResponse
|
|
)
|
|
from app.utils.structured_log import get_structured_logger
|
|
|
|
logger = get_structured_logger(__name__)
|
|
|
|
|
|
class CommandExecutor:
|
|
"""设备命令执行器
|
|
|
|
负责执行设备上的Shell命令,支持:
|
|
- 单设备命令执行
|
|
- 多设备批量执行
|
|
- 统一命令格式处理
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.device_manager = device_manager
|
|
self.device_dispatcher = device_dispatcher
|
|
|
|
async def execute_unified_shell_commands(self, request: UnifiedShellCommandRequest) -> Dict[str, Any]:
|
|
"""执行统一的Shell命令请求
|
|
|
|
Args:
|
|
request: 统一的Shell命令请求
|
|
|
|
Returns:
|
|
执行结果字典
|
|
"""
|
|
try:
|
|
logger.info(f"开始执行统一Shell命令,设备数量: {len(request.tasks)}")
|
|
|
|
# 记录开始时间
|
|
start_time = time.time()
|
|
|
|
# 并行执行所有设备的命令
|
|
tasks = []
|
|
for device_task in request.tasks:
|
|
task = self._execute_device_shell_commands(device_task)
|
|
tasks.append((device_task.device_id, task))
|
|
|
|
# 等待所有任务完成
|
|
device_results = {}
|
|
failed_devices = []
|
|
success_count = 0
|
|
|
|
for device_id, task in tasks:
|
|
try:
|
|
result = await task
|
|
device_results[device_id] = result
|
|
if result.success:
|
|
success_count += 1
|
|
else:
|
|
failed_devices.append(device_id)
|
|
except Exception as e:
|
|
logger.error(f"设备 {device_id} 命令执行异常: {e}")
|
|
device_results[device_id] = DeviceShellResult(
|
|
device_id=device_id,
|
|
success=False,
|
|
error=str(e),
|
|
results=[]
|
|
)
|
|
failed_devices.append(device_id)
|
|
|
|
# 计算执行时间
|
|
execution_time = time.time() - start_time
|
|
|
|
# 构建响应
|
|
total_devices = len(request.tasks)
|
|
if success_count == total_devices:
|
|
message = f"所有设备命令执行成功 ({success_count}/{total_devices})"
|
|
elif success_count > 0:
|
|
message = f"部分设备命令执行成功 ({success_count}/{total_devices}),失败设备: {failed_devices}"
|
|
else:
|
|
message = f"所有设备命令执行失败 ({success_count}/{total_devices})"
|
|
|
|
return {
|
|
"success": success_count > 0,
|
|
"message": message,
|
|
"data": {
|
|
"total_devices": total_devices,
|
|
"success_count": success_count,
|
|
"failed_count": len(failed_devices),
|
|
"failed_devices": failed_devices,
|
|
"execution_time": execution_time,
|
|
"device_results": {
|
|
device_id: result.dict() if hasattr(result, 'dict') else result
|
|
for device_id, result in device_results.items()
|
|
}
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"统一Shell命令执行失败: {e}")
|
|
raise
|
|
|
|
async def _execute_device_shell_commands(self, device_task: DeviceShellTask) -> DeviceShellResult:
|
|
"""执行单个设备的Shell命令
|
|
|
|
Args:
|
|
device_task: 设备Shell命令任务
|
|
|
|
Returns:
|
|
设备执行结果
|
|
"""
|
|
try:
|
|
device_id = device_task.device_id
|
|
logger.info(f"开始执行设备 {device_id} 的命令,命令数量: {len(device_task.commands)}")
|
|
|
|
# 获取设备信息(支持注册设备和自动发现设备)
|
|
device = await self.device_manager.get_device_info_unified(device_id)
|
|
if not device:
|
|
return DeviceShellResult(
|
|
device_id=device_id,
|
|
success=False,
|
|
total_commands=0,
|
|
success_commands=0,
|
|
failed_commands=0,
|
|
error=f"设备 {device_id} 不存在",
|
|
results=[]
|
|
)
|
|
|
|
# 执行每个命令
|
|
command_results = []
|
|
for cmd_task in device_task.commands:
|
|
result = await self._execute_single_command(device, cmd_task)
|
|
command_results.append(result)
|
|
|
|
# 统计命令执行结果
|
|
total_commands = len(command_results)
|
|
success_commands = sum(1 for result in command_results if result.success)
|
|
failed_commands = total_commands - success_commands
|
|
|
|
# 检查是否所有命令都成功
|
|
all_success = all(result.success for result in command_results)
|
|
|
|
return DeviceShellResult(
|
|
device_id=device_id,
|
|
success=all_success,
|
|
total_commands=total_commands,
|
|
success_commands=success_commands,
|
|
failed_commands=failed_commands,
|
|
error=None if all_success else "部分命令执行失败",
|
|
results=command_results
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"设备 {device_task.device_id} 命令执行失败: {e}")
|
|
return DeviceShellResult(
|
|
device_id=device_task.device_id,
|
|
success=False,
|
|
total_commands=0,
|
|
success_commands=0,
|
|
failed_commands=0,
|
|
error=str(e),
|
|
results=[]
|
|
)
|
|
|
|
|
|
async def _execute_single_command(self, device: Any, cmd_task: ShellCommandTask) -> ShellCommandResult:
|
|
"""执行单个Shell命令
|
|
|
|
Args:
|
|
device: 设备对象
|
|
cmd_task: 命令任务
|
|
|
|
Returns:
|
|
命令执行结果
|
|
"""
|
|
try:
|
|
start_time = time.time()
|
|
device_id = device["device_id"]
|
|
device_source = device["source"]
|
|
command = cmd_task.command
|
|
|
|
logger.debug(f"开始执行单个命令: 设备={device_id}, 来源={device_source}, 命令={command}")
|
|
|
|
# 根据设备来源选择执行方式
|
|
if device_source == "registered":
|
|
logger.debug(f"注册设备执行命令: {device_id} -> {command}")
|
|
# 注册设备:通过设备分发器执行
|
|
result = await self.device_dispatcher.execute_operation(
|
|
device_id=device_id,
|
|
protocol_type=device["protocol_type"],
|
|
operation="execute_command",
|
|
command=command,
|
|
timeout=cmd_task.timeout
|
|
)
|
|
logger.debug(f"注册设备命令执行完成: {device_id} -> {command}, 结果类型={type(result)}")
|
|
else:
|
|
logger.debug(f"自动发现设备执行命令: {device_id} -> {command}")
|
|
# 自动发现设备:直接调用ADB服务
|
|
from app.services.auto_discovery_adb_service import auto_discovery_adb_service
|
|
result = await auto_discovery_adb_service.execute_shell_command(
|
|
device_id,
|
|
command,
|
|
timeout=cmd_task.timeout
|
|
)
|
|
logger.debug(f"自动发现设备命令执行完成: {device_id} -> {command}, 结果类型={type(result)}")
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
# 统一使用实体返回格式
|
|
if isinstance(result, ShellResponse):
|
|
# ShellResponse实体格式
|
|
logger.debug(f"命令执行成功: {device_id} -> {command}, 输出长度={len(result.output) if result.output else 0}")
|
|
return ShellCommandResult(
|
|
command=command,
|
|
success=result.success,
|
|
output=result.output,
|
|
error=result.error,
|
|
exit_code=result.exit_code,
|
|
execution_time=execution_time,
|
|
wait_time=cmd_task.wait_time,
|
|
timeout=cmd_task.timeout
|
|
)
|
|
else:
|
|
# 其他格式统一转换为失败结果
|
|
logger.warning(f"命令执行返回格式不符合规范: {device_id} -> {command}, 返回类型={type(result)}")
|
|
return ShellCommandResult(
|
|
command=command,
|
|
success=False,
|
|
output="",
|
|
error=f"返回格式不符合规范: {type(result)}",
|
|
exit_code=-1,
|
|
execution_time=execution_time,
|
|
wait_time=cmd_task.wait_time,
|
|
timeout=cmd_task.timeout
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"命令执行失败: {e}")
|
|
return ShellCommandResult(
|
|
command=cmd_task.command,
|
|
success=False,
|
|
output="",
|
|
error=str(e),
|
|
exit_code=-1,
|
|
execution_time=time.time() - start_time if 'start_time' in locals() else 0,
|
|
wait_time=cmd_task.wait_time,
|
|
timeout=cmd_task.timeout
|
|
)
|
|
|
|
|
|
# 全局命令执行器实例
|
|
command_executor = CommandExecutor()
|
|
|