You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

85 lines
3.2 KiB

"""
设备分发器
"""
from typing import Dict, Any, Optional
import asyncio
from app.core.config import ProtocolType
from app.services.adb_service import AdbService
from app.services.ssh_service import SshService
from app.services.at_service import AtService
from app.services.plnk_service import PlnkService
from app.services.atx_service import AtxService
from app.utils.log import get_logger
logger = get_logger(__name__)
class DeviceDispatcher:
"""设备分发器 - 根据设备协议类型调用相应的服务"""
def __init__(self):
self._services: Dict[str, Any] = {}
self._initialize_services()
def _initialize_services(self):
"""初始化所有协议服务"""
self._services[ProtocolType.ADB] = AdbService()
self._services[ProtocolType.SSH] = SshService()
self._services[ProtocolType.AT] = AtService()
self._services[ProtocolType.PLNK] = PlnkService()
self._services[ProtocolType.ATX] = AtxService()
# HDC服务可以复用ADB服务或单独实现
self._services[ProtocolType.HDC] = AdbService() # 暂时复用ADB服务
def get_service(self, protocol_type: str) -> Optional[Any]:
"""获取指定协议的服务实例"""
service = self._services.get(protocol_type)
if not service:
logger.error(f"未找到协议 {protocol_type} 的服务实现")
return service
async def execute_operation(self, device_id: str, protocol_type: str, operation: str, **kwargs) -> Any:
"""执行设备操作"""
service = self.get_service(protocol_type)
if not service:
raise ValueError(f"不支持的协议类型: {protocol_type}")
if not hasattr(service, operation):
raise ValueError(f"协议 {protocol_type} 不支持操作: {operation}")
method = getattr(service, operation)
logger.info(f"执行操作: {device_id} -> {protocol_type}.{operation}")
try:
if asyncio.iscoroutinefunction(method):
result = await method(device_id, **kwargs)
else:
result = method(device_id, **kwargs)
return result
except Exception as e:
logger.error(f"执行操作失败: {device_id} -> {protocol_type}.{operation}, 错误: {e}")
raise
def get_supported_operations(self, protocol_type: str) -> list:
"""获取协议支持的操作列表"""
service = self.get_service(protocol_type)
if not service:
return []
# 获取服务类的所有方法,排除私有方法和内置方法
operations = []
for attr_name in dir(service):
if not attr_name.startswith('_') and callable(getattr(service, attr_name)):
operations.append(attr_name)
return operations
def is_operation_supported(self, protocol_type: str, operation: str) -> bool:
"""检查协议是否支持指定操作"""
service = self.get_service(protocol_type)
if not service:
return False
return hasattr(service, operation) and callable(getattr(service, operation))
# 全局设备分发器实例
device_dispatcher = DeviceDispatcher()