You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
46 lines
1.5 KiB
46 lines
1.5 KiB
"""
|
|
AT服务模块
|
|
"""
|
|
import asyncio
|
|
import time
|
|
import serial
|
|
from typing import Optional
|
|
from app.core.device.manager import device_manager
|
|
from app.schemas.at import ATCommandRequest, ATCommandResponse, SerialConnectionInfo
|
|
from app.utils.log import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
class AtService:
|
|
"""AT服务类"""
|
|
|
|
def __init__(self):
|
|
self.device_manager = device_manager
|
|
|
|
async def get_devices(self):
|
|
"""获取所有AT设备"""
|
|
try:
|
|
devices = await self.device_manager.get_all_devices()
|
|
at_devices = [d for d in devices if d.protocol_type == "at"]
|
|
return at_devices
|
|
except Exception as e:
|
|
logger.error(f"获取AT设备失败: {e}")
|
|
return []
|
|
|
|
async def execute_command(self, device_id: str, command: str):
|
|
"""在指定设备上执行AT命令"""
|
|
try:
|
|
device = await self.device_manager.get_device(device_id)
|
|
if not device:
|
|
raise ValueError(f"设备 {device_id} 不存在")
|
|
|
|
if device.protocol_type != "at":
|
|
raise ValueError(f"设备 {device_id} 不是AT设备")
|
|
|
|
# 这里应该实现具体的AT命令执行逻辑
|
|
logger.info(f"在设备 {device_id} 上执行AT命令: {command}")
|
|
return {"success": True, "command": command, "device_id": device_id}
|
|
|
|
except Exception as e:
|
|
logger.error(f"执行AT命令失败: {e}")
|
|
return {"success": False, "error": str(e)}
|