You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
7.3 KiB

"""
设备服务模块
"""
import asyncio
import time
from typing import Dict, List, Optional, Any
from app.core.device.manager import device_manager, DeviceSource
from app.core.device.dispatcher import device_dispatcher
from app.core.device.command_executor import command_executor
from app.core.device.operation_mapping import DeviceOperationMapping, DeviceOperations
from app.schemas.device import Device, DeviceStatusResponse
from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService
from app.models.adb_models import UnifiedShellCommandRequest
from app.utils.structured_log import get_structured_logger
logger = get_structured_logger(__name__)
class DeviceService:
"""设备管理服务"""
def __init__(self):
self.device_manager = device_manager
self.auto_discovery_adb_service = AutoDiscoveryAdbService()
# 初始化设备操作映射配置
self.operation_mapping = DeviceOperationMapping(self.auto_discovery_adb_service)
async def get_all_devices(self) -> List[Dict[str, Any]]:
"""获取所有设备(包括注册和自动发现的)"""
try:
devices = await self.device_manager.get_all_devices_unified()
return devices
except Exception as e:
logger.error(f"获取所有设备失败: {e}")
raise
async def get_device(self, device_id: str) -> Optional[Dict[str, Any]]:
"""获取指定设备(优先获取自动发现的设备,然后获取注册的设备)"""
try:
# 优先检查自动发现设备
device_source = await self.device_manager.get_device_source(device_id)
if device_source == DeviceSource.AUTO_DISCOVERED:
# 获取自动发现设备信息
auto_discovered_devices = await self.device_manager.get_auto_discovered_devices()
for device in auto_discovered_devices:
if device["device_id"] == device_id:
return device
# 然后检查注册设备
device_info = await self.device_manager.get_device(device_id)
if device_info:
# 转换为统一格式
return {
"device_id": device_info.device_id,
"name": device_info.name,
"protocol_type": device_info.protocol_type,
"status": device_info.status,
"source": DeviceSource.REGISTERED.value,
"created_at": device_info.created_at,
"updated_at": device_info.updated_at,
"connection_info": device_info.connection_info
}
return None
except Exception as e:
logger.error(f"获取设备 {device_id} 失败: {e}")
raise
async def get_device_status(self, device_id: str) -> DeviceStatusResponse:
"""获取设备状态(包括注册和自动发现的设备)"""
try:
# 获取设备来源
device_source = await self.device_manager.get_device_source(device_id)
if device_source == DeviceSource.REGISTERED:
# 注册设备
device = await self.device_manager.get_device(device_id)
if not device:
raise ValueError(f"设备 {device_id} 不存在")
return DeviceStatusResponse(
device_id=device_id,
status=device.status,
timestamp=device.updated_at
)
else:
# 自动发现设备
auto_discovered_devices = await self.device_manager.get_auto_discovered_devices()
for device in auto_discovered_devices:
if device["device_id"] == device_id:
return DeviceStatusResponse(
device_id=device_id,
status=device["status"],
timestamp=device.get("last_seen", None)
)
raise ValueError(f"设备 {device_id} 不存在")
except Exception as e:
logger.error(f"获取设备 {device_id} 状态失败: {e}")
raise
async def execute_operation(self, device_id: str, operation: str, **kwargs) -> Dict[str, Any]:
"""执行设备操作(包括注册和自动发现的设备)"""
try:
# 获取设备来源
device_source = await self.device_manager.get_device_source(device_id)
if device_source == DeviceSource.REGISTERED:
# 注册设备:检查设备是否存在
device = await self.device_manager.get_device(device_id)
if not device:
raise ValueError(f"设备 {device_id} 不存在")
# 通过设备分发器执行
return await self.device_dispatcher.execute_operation(
device_id=device_id,
protocol_type=device.protocol_type,
operation=operation,
**kwargs
)
else:
# 自动发现设备:检查设备是否存在
auto_discovered_devices = await self.device_manager.get_auto_discovered_devices()
device_exists = any(device["device_id"] == device_id for device in auto_discovered_devices)
if not device_exists:
raise ValueError(f"设备 {device_id} 不存在")
# 直接调用ADB服务
return await self._execute_auto_discovered_device_operation(
device_id, operation, **kwargs
)
except Exception as e:
logger.error(f"执行设备 {device_id} 操作 {operation} 失败: {e}")
raise
async def _execute_auto_discovered_device_operation(self, device_id: str, operation: str, **kwargs) -> Dict[str, Any]:
"""执行自动发现设备的操作"""
try:
# 使用操作映射配置获取对应的方法
operation_method = self.operation_mapping.get_operation_method(operation)
if not operation_method:
raise ValueError(f"不支持的操作: {operation}")
# 执行操作
return await operation_method(device_id, **kwargs)
except Exception as e:
logger.error(f"自动发现设备 {device_id} 操作 {operation} 失败: {e}")
raise
async def execute_batch_shell_commands(self, request: UnifiedShellCommandRequest) -> Dict[str, Any]:
"""批量执行Shell命令(统一入口)
符合分层架构规范:
- API层 → DeviceService → CommandExecutor
- 统一通过DeviceService调用,保持架构一致性
"""
try:
logger.info(f"通过DeviceService执行批量Shell命令,设备数量: {len(request.tasks)}")
return await command_executor.execute_unified_shell_commands(request)
except Exception as e:
logger.error(f"批量Shell命令执行失败: {e}")
raise
# 全局设备服务实例
device_service = DeviceService()