import asyncio import base64 from typing import Optional, List import uiautomator2 as u2 from app.core.device_manager import device_manager from app.schemas.adb import ClickRequest, InputRequest, ScreenshotResponse from app.utils.log import get_logger logger = get_logger(__name__) class ATXService: """ATX服务类 - 基于uiautomator2实现Android自动化控制""" def __init__(self): self.devices = {} # 存储uiautomator2设备对象 async def _get_device(self, device_id: str) -> u2.Device: """获取uiautomator2设备对象""" if device_id in self.devices: return self.devices[device_id] # 从设备管理器获取连接信息 device = await device_manager.get_device(device_id) if not device: raise ValueError(f"设备 {device_id} 不存在") connection_info = device.connection_info try: # 创建uiautomator2设备对象 if 'ip' in connection_info: u2_device = u2.connect(connection_info['ip']) else: u2_device = u2.connect(device_id) self.devices[device_id] = u2_device logger.info(f"ATX设备连接成功: {device_id}") return u2_device except Exception as e: logger.error(f"ATX设备连接失败: {device_id}, 错误: {e}") raise ValueError(f"ATX设备连接失败: {str(e)}") async def click(self, device_id: str, x: int, y: int, duration: float = 0.1) -> dict: """执行点击操作""" try: device = await self._get_device(device_id) device.click(x, y) logger.info(f"ATX点击操作成功: {device_id} -> ({x}, {y})") return { "success": True, "message": "点击操作执行成功", "coordinates": {"x": x, "y": y}, "duration": duration } except Exception as e: logger.error(f"ATX点击操作失败: {device_id}, 错误: {e}") return { "success": False, "message": f"点击操作失败: {str(e)}" } async def input_text(self, device_id: str, text: str, clear_first: bool = False) -> dict: """输入文本""" try: device = await self._get_device(device_id) if clear_first: device.clear_text() device.send_keys(text) logger.info(f"ATX文本输入成功: {device_id} -> {text}") return { "success": True, "message": "文本输入成功", "text": text } except Exception as e: logger.error(f"ATX文本输入失败: {device_id}, 错误: {e}") return { "success": False, "message": f"文本输入失败: {str(e)}" } async def screenshot(self, device_id: str) -> ScreenshotResponse: """获取屏幕截图""" try: device = await self._get_device(device_id) screenshot_path = device.screenshot() import io from PIL import Image with open(screenshot_path, 'rb') as f: img_data = f.read() img_buffer = io.BytesIO(img_data) img = Image.open(img_buffer) img_buffer = io.BytesIO() img.save(img_buffer, format='PNG') base64_data = base64.b64encode(img_buffer.getvalue()).decode('utf-8') logger.info(f"ATX截图成功: {device_id}") return ScreenshotResponse( image_data=base64_data, format="png", width=img.width, height=img.height ) except Exception as e: logger.error(f"ATX截图失败: {device_id}, 错误: {e}") raise ValueError(f"截图失败: {str(e)}") async def get_device_info(self, device_id: str) -> dict: """获取设备信息""" try: device = await self._get_device(device_id) info = device.device_info logger.info(f"ATX获取设备信息成功: {device_id}") return { "device_id": device_id, "model": info.get('model', 'Unknown'), "android_version": info.get('version', 'Unknown'), "sdk_version": info.get('sdk', 0), "status": "online" } except Exception as e: logger.error(f"ATX获取设备信息失败: {device_id}, 错误: {e}") return { "device_id": device_id, "model": "Unknown", "android_version": "Unknown", "sdk_version": 0, "status": "error" }