from fastapi import APIRouter, HTTPException, status from app.core.device.manager import device_manager from app.core.device.dispatcher import device_dispatcher from app.schemas.at import ATCommandRequest, ATCommandResponse from app.utils.log import get_logger logger = get_logger(__name__) router = APIRouter() @router.post("/devices/{device_id}/send-at", response_model=ATCommandResponse, summary="发送AT指令") async def send_at_command(device_id: str, at_request: ATCommandRequest): """向指定设备发送AT指令""" try: # 检查设备是否存在 device = await device_manager.get_device(device_id) if not device: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"设备 {device_id} 不存在" ) # 检查协议是否为AT if device.protocol_type != 'at': raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"设备 {device_id} 不是AT协议,当前协议: {device.protocol_type}" ) # 发送AT指令 result = await device_dispatcher.execute_operation( device_id=device_id, protocol_type=device.protocol_type, operation="send_at_command", command=at_request.command, timeout=at_request.timeout, wait_response=at_request.wait_response ) return result except HTTPException: raise except Exception as e: logger.error(f"AT指令发送失败: {device_id}, 错误: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"AT指令发送失败: {str(e)}" ) @router.post("/devices/{device_id}/send-multiple-at", summary="发送多个AT指令") async def send_multiple_at_commands(device_id: str, commands: list, delay_between: float = 1.0): """向指定设备发送多个AT指令""" try: # 检查设备是否存在 device = await device_manager.get_device(device_id) if not device: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"设备 {device_id} 不存在" ) # 检查协议是否为AT if device.protocol_type != 'at': raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"设备 {device_id} 不是AT协议,当前协议: {device.protocol_type}" ) # 发送多个AT指令 result = await device_dispatcher.execute_operation( device_id=device_id, protocol_type=device.protocol_type, operation="send_multiple_commands", commands=commands, delay_between=delay_between ) return { "device_id": device_id, "success": True, "message": f"成功发送 {len(commands)} 个AT指令", "results": result } except HTTPException: raise except Exception as e: logger.error(f"多个AT指令发送失败: {device_id}, 错误: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"多个AT指令发送失败: {str(e)}" ) @router.get("/devices/{device_id}/test-at", summary="测试AT连接") async def test_at_connection(device_id: str): """测试AT连接状态""" try: # 检查设备是否存在 device = await device_manager.get_device(device_id) if not device: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"设备 {device_id} 不存在" ) # 检查协议是否为AT if device.protocol_type != 'at': raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"设备 {device_id} 不是AT协议,当前协议: {device.protocol_type}" ) # 测试连接 result = await device_dispatcher.execute_operation( device_id=device_id, protocol_type=device.protocol_type, operation="test_connection" ) return { "device_id": device_id, "connected": result, "message": "AT连接正常" if result else "AT连接失败" } except HTTPException: raise except Exception as e: logger.error(f"AT连接测试失败: {device_id}, 错误: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"AT连接测试失败: {str(e)}" ) @router.get("/devices/{device_id}/at-info", summary="获取AT设备信息") async def get_at_device_info(device_id: str): """获取AT设备的详细信息""" try: # 检查设备是否存在 device = await device_manager.get_device(device_id) if not device: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"设备 {device_id} 不存在" ) # 检查协议是否为AT if device.protocol_type != 'at': raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"设备 {device_id} 不是AT协议,当前协议: {device.protocol_type}" ) # 获取设备信息 result = await device_dispatcher.execute_operation( device_id=device_id, protocol_type=device.protocol_type, operation="get_device_info" ) return result except HTTPException: raise except Exception as e: logger.error(f"获取AT设备信息失败: {device_id}, 错误: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取AT设备信息失败: {str(e)}" ) @router.get("/serial/ports", summary="获取可用串口列表") async def get_available_serial_ports(): """获取系统中可用的串口列表""" try: from app.utils.serial_utils import SerialUtils ports = SerialUtils.list_available_ports() return { "success": True, "message": f"发现 {len(ports)} 个可用串口", "ports": ports } except Exception as e: logger.error(f"获取可用串口列表失败: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取可用串口列表失败: {str(e)}" )