You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
4.9 KiB

"""
设备服务模块
"""
import asyncio
import time
from typing import Dict, List, Optional, Any
from app.core.device.manager import device_manager, DeviceSource
from app.core.device.dispatcher import device_dispatcher
from app.core.device.command_executor import command_executor
from app.core.device.operation_mapping import DeviceOperationMapping, DeviceOperations
from app.schemas.device import Device, DeviceStatusResponse
from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService
from app.models.adb_models import UnifiedShellCommandRequest
from app.utils.structured_log import get_structured_logger
logger = get_structured_logger(__name__)
class DeviceService:
"""设备管理服务"""
def __init__(self):
self.device_manager = device_manager
self.auto_discovery_adb_service = AutoDiscoveryAdbService()
# 初始化设备操作映射配置
self.operation_mapping = DeviceOperationMapping(self.auto_discovery_adb_service)
async def get_all_devices(self) -> List[Device]:
"""获取所有设备"""
try:
devices = await self.device_manager.get_all_devices()
return devices # 直接返回Device对象列表,不需要转换
except Exception as e:
logger.error(f"获取所有设备失败: {e}")
raise
async def get_device(self, device_id: str) -> Optional[Device]:
"""获取指定设备"""
try:
device_info = await self.device_manager.get_device(device_id)
if device_info:
return device_info # 直接返回Device对象,不需要转换
return None
except Exception as e:
logger.error(f"获取设备 {device_id} 失败: {e}")
raise
async def get_device_status(self, device_id: str) -> DeviceStatusResponse:
"""获取设备状态"""
try:
device = await self.device_manager.get_device(device_id)
if not device:
raise ValueError(f"设备 {device_id} 不存在")
return DeviceStatusResponse(
device_id=device_id,
status=device.status,
timestamp=device.updated_at
)
except Exception as e:
logger.error(f"获取设备 {device_id} 状态失败: {e}")
raise
async def execute_operation(self, device_id: str, operation: str, **kwargs) -> Dict[str, Any]:
"""执行设备操作"""
try:
device = await self.device_manager.get_device(device_id)
if not device:
raise ValueError(f"设备 {device_id} 不存在")
# 获取设备来源
device_source = await self.device_manager.get_device_source(device_id)
if device_source == DeviceSource.REGISTERED:
# 注册设备:通过设备分发器执行
return await self.device_dispatcher.execute_operation(
device_id=device_id,
protocol_type=device.protocol_type,
operation=operation,
**kwargs
)
else:
# 自动发现设备:直接调用ADB服务
return await self._execute_auto_discovered_device_operation(
device_id, operation, **kwargs
)
except Exception as e:
logger.error(f"执行设备 {device_id} 操作 {operation} 失败: {e}")
raise
async def _execute_auto_discovered_device_operation(self, device_id: str, operation: str, **kwargs) -> Dict[str, Any]:
"""执行自动发现设备的操作"""
try:
# 使用操作映射配置获取对应的方法
operation_method = self.operation_mapping.get_operation_method(operation)
if not operation_method:
raise ValueError(f"不支持的操作: {operation}")
# 执行操作
return await operation_method(device_id, **kwargs)
except Exception as e:
logger.error(f"自动发现设备 {device_id} 操作 {operation} 失败: {e}")
raise
async def execute_batch_shell_commands(self, request: UnifiedShellCommandRequest) -> Dict[str, Any]:
"""批量执行Shell命令(统一入口)
符合分层架构规范:
- API层 → DeviceService → CommandExecutor
- 统一通过DeviceService调用,保持架构一致性
"""
try:
logger.info(f"通过DeviceService执行批量Shell命令,设备数量: {len(request.tasks)}")
return await command_executor.execute_unified_shell_commands(request)
except Exception as e:
logger.error(f"批量Shell命令执行失败: {e}")
raise
# 全局设备服务实例
device_service = DeviceService()