You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
3.7 KiB

import serial
import serial.tools.list_ports
from typing import List, Dict, Any
from app.utils.log import get_logger
logger = get_logger(__name__)
class SerialUtils:
"""串口工具类"""
@staticmethod
def list_available_ports() -> List[Dict[str, Any]]:
"""列出可用的串口"""
try:
ports = serial.tools.list_ports.comports()
port_list = []
for port in ports:
port_info = {
"device": port.device,
"name": port.name,
"description": port.description,
"manufacturer": port.manufacturer,
"product": port.product,
"vid": port.vid,
"pid": port.pid
}
port_list.append(port_info)
logger.info(f"发现 {len(port_list)} 个可用串口")
return port_list
except Exception as e:
logger.error(f"获取可用串口失败: {e}")
return []
@staticmethod
def test_port(port: str, baudrate: int = 115200, timeout: int = 5) -> bool:
"""测试串口是否可用"""
try:
ser = serial.Serial(port, baudrate, timeout=timeout)
ser.close()
logger.info(f"串口 {port} 测试成功")
return True
except Exception as e:
logger.error(f"串口 {port} 测试失败: {e}")
return False
@staticmethod
def send_command(port: str, command: str, baudrate: int = 115200,
timeout: int = 5, wait_response: bool = True) -> Dict[str, Any]:
"""发送串口命令"""
try:
ser = serial.Serial(port, baudrate, timeout=timeout)
# 确保命令以\r\n结尾
if not command.endswith('\r\n'):
command = command + '\r\n'
# 清空缓冲区
ser.reset_input_buffer()
ser.reset_output_buffer()
# 发送命令
ser.write(command.encode('utf-8'))
ser.flush()
response = ""
if wait_response:
# 等待响应
response = ser.read_all().decode('utf-8', errors='ignore')
ser.close()
logger.info(f"串口命令发送成功: {port} -> {command.strip()}")
return {
"success": True,
"command": command.strip(),
"response": response.strip(),
"port": port
}
except Exception as e:
logger.error(f"串口命令发送失败: {port} -> {command}, 错误: {e}")
return {
"success": False,
"command": command,
"response": str(e),
"port": port
}
@staticmethod
def get_port_info(port: str) -> Dict[str, Any]:
"""获取串口详细信息"""
try:
ports = serial.tools.list_ports.comports()
for p in ports:
if p.device == port:
return {
"device": p.device,
"name": p.name,
"description": p.description,
"manufacturer": p.manufacturer,
"product": p.product,
"vid": p.vid,
"pid": p.pid,
"hwid": p.hwid
}
return {}
except Exception as e:
logger.error(f"获取串口信息失败: {port}, 错误: {e}")
return {}