""" 增强的ADB服务模块 - 提供业务逻辑层 """ import asyncio from typing import List, Optional, AsyncGenerator from app.core.adb import AdbClient from app.models.adb_models import ( DeviceInfo, DeviceEvent, ForwardInfo, ServerInfo, ShellCommand, ShellResponse, ForwardRequest, DeviceStatus ) from app.utils.log import get_enhanced_logger, LogLevel from app.core.exceptions import AdbConnectionError, AdbProtocolError logger = get_enhanced_logger(__name__, LogLevel.DEBUG) class EnhancedAdbService: """增强的ADB服务类""" def __init__(self, host: Optional[str] = None, port: Optional[int] = None): self._client = AdbClient() self._host = host self._port = port self._device_monitor_task: Optional[asyncio.Task] = None self._monitoring_started = False # 移除自动启动监控,改为延迟启动 async def start_monitoring(self): """启动设备监控(需要手动调用)""" if not self._monitoring_started and (not self._device_monitor_task or self._device_monitor_task.done()): logger.info("开始设备监控", host=self._host, port=self._port) try: self._device_monitor_task = asyncio.create_task(self._monitor_devices()) self._monitoring_started = True logger.info("设备监控任务创建成功", task_id=id(self._device_monitor_task)) except Exception as e: logger.error("创建设备监控任务失败", error=str(e)) self._monitoring_started = False raise else: logger.info("设备监控已在运行中", monitoring_started=self._monitoring_started, task_done=self._device_monitor_task.done() if self._device_monitor_task else None) async def stop_monitoring(self): """停止设备监控""" if self._device_monitor_task and not self._device_monitor_task.done(): logger.info("正在停止设备监控", task_id=id(self._device_monitor_task)) self._device_monitor_task.cancel() try: await self._device_monitor_task logger.info("设备监控任务已成功停止") except asyncio.CancelledError: logger.info("设备监控任务被取消") except Exception as e: logger.error("停止设备监控任务时发生错误", error=str(e)) finally: self._monitoring_started = False logger.info("设备监控已完全停止") else: logger.info("设备监控未在运行,无需停止") async def get_server_info(self) -> ServerInfo: """获取ADB服务器信息""" try: logger.debug("获取ADB服务器信息") version = await self._client.server_version() logger.info("获取ADB服务器信息成功", version=version, host=self._host, port=self._port) return ServerInfo(version=version, host=self._host or "localhost", port=self._port or 5037, status="running") except (AdbConnectionError, AdbProtocolError) as e: logger.error("获取ADB服务器信息失败", error=str(e)) raise except Exception as e: logger.error("获取ADB服务器信息时发生未知错误", error=str(e)) raise async def list_devices(self, status_filter: Optional[List[str]] = None) -> List[DeviceInfo]: """获取设备列表""" try: logger.debug("获取设备列表", status_filter=status_filter) devices = await self._client.devices() # 过滤设备状态 if status_filter: devices = [d for d in devices if d.status in status_filter] # 转换为DeviceInfo对象 device_infos = [] for d in devices: try: # 尝试将字符串状态转换为枚举 if hasattr(d.status, 'value'): status_str = d.status.value else: status_str = str(d.status) # 尝试转换为已知的状态枚举 try: status = DeviceStatus(status_str) except ValueError: # 如果状态不在已知枚举中,使用默认状态 logger.warning(f"未知设备状态: {status_str}, 使用默认状态", serial=d.serial) status = DeviceStatus.OFFLINE device_infos.append(DeviceInfo(serial=d.serial, status=status)) except Exception as e: logger.error(f"处理设备信息时出错: {d.serial}, 错误: {e}") # 跳过有问题的设备 continue logger.info("获取设备列表成功", device_count=len(device_infos)) return device_infos except (AdbConnectionError, AdbProtocolError) as e: logger.error("获取设备列表失败", error=str(e)) raise except Exception as e: logger.error("获取设备列表时发生未知错误", error=str(e)) raise async def execute_shell_command(self, device_serial: str, command: str, timeout: int = 30) -> ShellResponse: """执行Shell命令""" try: logger.debug("执行Shell命令", device_serial=device_serial, command=command, timeout=timeout) output = await asyncio.wait_for( self._client.shell(device_serial, command), timeout=timeout ) logger.info("Shell命令执行成功", device_serial=device_serial, command=command, output_length=len(output)) return ShellResponse(success=True, output=output, error=None, exit_code=0) except asyncio.TimeoutError: logger.warning("Shell命令执行超时", device_serial=device_serial, command=command, timeout=timeout) return ShellResponse(success=False, output="", error="命令执行超时", exit_code=-1) except (AdbConnectionError, AdbProtocolError) as e: logger.error("Shell命令执行失败", device_serial=device_serial, command=command, error=str(e)) return ShellResponse(success=False, output="", error=str(e), exit_code=-1) except Exception as e: logger.error("Shell命令执行时发生未知错误", device_serial=device_serial, command=command, error=str(e)) return ShellResponse(success=False, output="", error=f"未知错误: {str(e)}", exit_code=-1) async def _monitor_devices(self) -> None: """设备监控内部方法""" logger.info("设备监控循环开始", host=self._host, port=self._port) try: logger.debug("开始设备监控循环") async for event in self._client.track_devices(): logger.info("设备事件", present=event.present, serial=event.serial, status=event.status) await self._handle_device_event(event) except asyncio.CancelledError: logger.info("设备监控被取消") except Exception as e: logger.error("设备监控异常", error=str(e), host=self._host, port=self._port) # 监控异常时不再自动重启,需要手动重新启动 self._monitoring_started = False logger.warning("设备监控已停止,需要手动重新启动") async def _handle_device_event(self, event: DeviceEvent) -> None: """处理设备事件""" if event.present: logger.info("设备连接", serial=event.serial, status=event.status) else: logger.info("设备断开", serial=event.serial, status=event.status) async def list_forward_ports(self) -> List[ForwardInfo]: """获取端口转发列表""" try: logger.debug("获取端口转发列表") forwards = [] async for forward in self._client.forward_list(): forwards.append(ForwardInfo( serial=forward.serial, local=forward.local, remote=forward.remote )) logger.info("获取端口转发列表成功", forward_count=len(forwards)) return forwards except Exception as e: logger.error("获取端口转发列表失败", error=str(e)) raise async def create_forward_port(self, request: ForwardRequest) -> None: """创建端口转发""" try: logger.debug("创建端口转发", serial=request.serial, local=request.local, remote=request.remote, norebind=request.norebind) await self._client.create_forward_port( request.serial, request.local, request.remote, request.norebind ) logger.info("端口转发创建成功", serial=request.serial, local=request.local, remote=request.remote) except Exception as e: logger.error("创建端口转发失败", serial=request.serial, local=request.local, remote=request.remote, error=str(e)) raise async def remove_forward_port(self, local: Optional[str] = None) -> None: """移除端口转发""" try: logger.debug("移除端口转发", local=local) await self._client.remove_forward_port(local) logger.info("端口转发移除成功", local=local or "all") except Exception as e: logger.error("移除端口转发失败", local=local, error=str(e)) raise async def ping_device(self, device_serial: str) -> bool: """ping设备检查连接状态""" try: logger.debug("ping设备", device_serial=device_serial) result = await self.execute_shell_command(device_serial, "echo 'ping'") success = result.success logger.info("ping设备完成", device_serial=device_serial, success=success) return success except Exception as e: logger.error("ping设备失败", device_serial=device_serial, error=str(e)) return False async def get_device_info(self, device_serial: str) -> Optional[dict]: """获取设备详细信息""" try: logger.debug("获取设备详细信息", device_serial=device_serial) # 获取基本设备信息 devices = await self.list_devices() device = next((d for d in devices if d.serial == device_serial), None) if not device: logger.warning("设备不存在", device_serial=device_serial) return None # 获取设备属性 properties = {} property_commands = [ ("model", "getprop ro.product.model"), ("manufacturer", "getprop ro.product.manufacturer"), ("android_version", "getprop ro.build.version.release"), ("sdk_version", "getprop ro.build.version.sdk"), ("build_id", "getprop ro.build.id"), ("cpu_abi", "getprop ro.product.cpu.abi") ] for prop_name, command in property_commands: try: result = await self.execute_shell_command(device_serial, command) if result.success and result.output.strip(): properties[prop_name] = result.output.strip() except Exception as e: logger.warning(f"获取设备属性失败: {prop_name}", device_serial=device_serial, error=str(e)) device_info = { "serial": device_serial, "status": device.status.value, "properties": properties } logger.info("获取设备详细信息成功", device_serial=device_serial) return device_info except Exception as e: logger.error("获取设备详细信息失败", device_serial=device_serial, error=str(e)) return None async def install_apk(self, device_serial: str, apk_path: str) -> ShellResponse: """安装APK文件""" try: logger.debug("安装APK", device_serial=device_serial, apk_path=apk_path) command = f"pm install -r {apk_path}" result = await self.execute_shell_command(device_serial, command) logger.info("APK安装完成", device_serial=device_serial, apk_path=apk_path, success=result.success) return result except Exception as e: logger.error("APK安装失败", device_serial=device_serial, apk_path=apk_path, error=str(e)) return ShellResponse(success=False, output="", error=str(e), exit_code=-1) async def uninstall_app(self, device_serial: str, package_name: str) -> ShellResponse: """卸载应用""" try: logger.debug("卸载应用", device_serial=device_serial, package_name=package_name) command = f"pm uninstall {package_name}" result = await self.execute_shell_command(device_serial, command) logger.info("应用卸载完成", device_serial=device_serial, package_name=package_name, success=result.success) return result except Exception as e: logger.error("应用卸载失败", device_serial=device_serial, package_name=package_name, error=str(e)) return ShellResponse(success=False, output="", error=str(e), exit_code=-1) async def get_installed_apps(self, device_serial: str) -> List[str]: """获取已安装应用列表""" try: logger.debug("获取已安装应用列表", device_serial=device_serial) result = await self.execute_shell_command(device_serial, "pm list packages -3") if result.success: packages = [line.replace("package:", "").strip() for line in result.output.splitlines() if line.strip()] logger.info("获取已安装应用列表成功", device_serial=device_serial, app_count=len(packages)) return packages else: logger.error("获取已安装应用列表失败", device_serial=device_serial, error=result.error) return [] except Exception as e: logger.error("获取已安装应用列表异常", device_serial=device_serial, error=str(e)) return []