import serial import serial.tools.list_ports from typing import List, Dict, Any from app.utils.log import get_logger logger = get_logger(__name__) class SerialUtils: """串口工具类""" @staticmethod def list_available_ports() -> List[Dict[str, Any]]: """列出可用的串口""" try: ports = serial.tools.list_ports.comports() port_list = [] for port in ports: port_info = { "device": port.device, "name": port.name, "description": port.description, "manufacturer": port.manufacturer, "product": port.product, "vid": port.vid, "pid": port.pid } port_list.append(port_info) logger.info(f"发现 {len(port_list)} 个可用串口") return port_list except Exception as e: logger.error(f"获取可用串口失败: {e}") return [] @staticmethod def test_port(port: str, baudrate: int = 115200, timeout: int = 5) -> bool: """测试串口是否可用""" try: ser = serial.Serial(port, baudrate, timeout=timeout) ser.close() logger.info(f"串口 {port} 测试成功") return True except Exception as e: logger.error(f"串口 {port} 测试失败: {e}") return False @staticmethod def send_command(port: str, command: str, baudrate: int = 115200, timeout: int = 5, wait_response: bool = True) -> Dict[str, Any]: """发送串口命令""" try: ser = serial.Serial(port, baudrate, timeout=timeout) # 确保命令以\r\n结尾 if not command.endswith('\r\n'): command = command + '\r\n' # 清空缓冲区 ser.reset_input_buffer() ser.reset_output_buffer() # 发送命令 ser.write(command.encode('utf-8')) ser.flush() response = "" if wait_response: # 等待响应 response = ser.read_all().decode('utf-8', errors='ignore') ser.close() logger.info(f"串口命令发送成功: {port} -> {command.strip()}") return { "success": True, "command": command.strip(), "response": response.strip(), "port": port } except Exception as e: logger.error(f"串口命令发送失败: {port} -> {command}, 错误: {e}") return { "success": False, "command": command, "response": str(e), "port": port } @staticmethod def get_port_info(port: str) -> Dict[str, Any]: """获取串口详细信息""" try: ports = serial.tools.list_ports.comports() for p in ports: if p.device == port: return { "device": p.device, "name": p.name, "description": p.description, "manufacturer": p.manufacturer, "product": p.product, "vid": p.vid, "pid": p.pid, "hwid": p.hwid } return {} except Exception as e: logger.error(f"获取串口信息失败: {port}, 错误: {e}") return {}