You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
112 lines
3.7 KiB
112 lines
3.7 KiB
import serial
|
|
import serial.tools.list_ports
|
|
from typing import List, Dict, Any
|
|
from app.utils.log import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
class SerialUtils:
|
|
"""串口工具类"""
|
|
|
|
@staticmethod
|
|
def list_available_ports() -> List[Dict[str, Any]]:
|
|
"""列出可用的串口"""
|
|
try:
|
|
ports = serial.tools.list_ports.comports()
|
|
port_list = []
|
|
|
|
for port in ports:
|
|
port_info = {
|
|
"device": port.device,
|
|
"name": port.name,
|
|
"description": port.description,
|
|
"manufacturer": port.manufacturer,
|
|
"product": port.product,
|
|
"vid": port.vid,
|
|
"pid": port.pid
|
|
}
|
|
port_list.append(port_info)
|
|
|
|
logger.info(f"发现 {len(port_list)} 个可用串口")
|
|
return port_list
|
|
|
|
except Exception as e:
|
|
logger.error(f"获取可用串口失败: {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def test_port(port: str, baudrate: int = 115200, timeout: int = 5) -> bool:
|
|
"""测试串口是否可用"""
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=timeout)
|
|
ser.close()
|
|
logger.info(f"串口 {port} 测试成功")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"串口 {port} 测试失败: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def send_command(port: str, command: str, baudrate: int = 115200,
|
|
timeout: int = 5, wait_response: bool = True) -> Dict[str, Any]:
|
|
"""发送串口命令"""
|
|
try:
|
|
ser = serial.Serial(port, baudrate, timeout=timeout)
|
|
|
|
# 确保命令以\r\n结尾
|
|
if not command.endswith('\r\n'):
|
|
command = command + '\r\n'
|
|
|
|
# 清空缓冲区
|
|
ser.reset_input_buffer()
|
|
ser.reset_output_buffer()
|
|
|
|
# 发送命令
|
|
ser.write(command.encode('utf-8'))
|
|
ser.flush()
|
|
|
|
response = ""
|
|
if wait_response:
|
|
# 等待响应
|
|
response = ser.read_all().decode('utf-8', errors='ignore')
|
|
|
|
ser.close()
|
|
|
|
logger.info(f"串口命令发送成功: {port} -> {command.strip()}")
|
|
return {
|
|
"success": True,
|
|
"command": command.strip(),
|
|
"response": response.strip(),
|
|
"port": port
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"串口命令发送失败: {port} -> {command}, 错误: {e}")
|
|
return {
|
|
"success": False,
|
|
"command": command,
|
|
"response": str(e),
|
|
"port": port
|
|
}
|
|
|
|
@staticmethod
|
|
def get_port_info(port: str) -> Dict[str, Any]:
|
|
"""获取串口详细信息"""
|
|
try:
|
|
ports = serial.tools.list_ports.comports()
|
|
for p in ports:
|
|
if p.device == port:
|
|
return {
|
|
"device": p.device,
|
|
"name": p.name,
|
|
"description": p.description,
|
|
"manufacturer": p.manufacturer,
|
|
"product": p.product,
|
|
"vid": p.vid,
|
|
"pid": p.pid,
|
|
"hwid": p.hwid
|
|
}
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"获取串口信息失败: {port}, 错误: {e}")
|
|
return {}
|