import socket import time from typing import Dict, Any, Optional from app.utils.log import get_logger logger = get_logger(__name__) class TCPUtils: """TCP工具类""" @staticmethod def test_connection(host: str, port: int, timeout: int = 5) -> bool: """测试TCP连接""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((host, port)) sock.close() success = result == 0 if success: logger.info(f"TCP连接测试成功: {host}:{port}") else: logger.warning(f"TCP连接测试失败: {host}:{port}") return success except Exception as e: logger.error(f"TCP连接测试异常: {host}:{port}, 错误: {e}") return False @staticmethod def send_data(host: str, port: int, data: bytes, timeout: int = 30, wait_response: bool = True) -> Dict[str, Any]: """发送TCP数据""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) sock.connect((host, port)) # 发送数据 sock.send(data) response = b"" if wait_response: # 接收响应 response = sock.recv(1024) sock.close() logger.info(f"TCP数据发送成功: {host}:{port} -> {len(data)} bytes") return { "success": True, "host": host, "port": port, "sent_data": data.hex().upper(), "received_data": response.hex().upper() if response else "", "sent_size": len(data), "received_size": len(response) } except Exception as e: logger.error(f"TCP数据发送失败: {host}:{port}, 错误: {e}") return { "success": False, "host": host, "port": port, "sent_data": data.hex().upper() if isinstance(data, bytes) else str(data), "received_data": "", "error": str(e) } @staticmethod def send_hex_data(host: str, port: int, hex_data: str, timeout: int = 30, wait_response: bool = True) -> Dict[str, Any]: """发送十六进制数据""" try: # 将十六进制字符串转换为字节 data = bytes.fromhex(hex_data.replace(' ', '')) return TCPUtils.send_data(host, port, data, timeout, wait_response) except ValueError as e: logger.error(f"无效的十六进制数据: {hex_data}, 错误: {e}") return { "success": False, "host": host, "port": port, "sent_data": hex_data, "received_data": "", "error": f"无效的十六进制数据: {str(e)}" } @staticmethod def create_server(host: str, port: int, callback=None) -> Dict[str, Any]: """创建TCP服务器""" try: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((host, port)) server_socket.listen(5) logger.info(f"TCP服务器启动成功: {host}:{port}") return { "success": True, "host": host, "port": port, "socket": server_socket } except Exception as e: logger.error(f"TCP服务器启动失败: {host}:{port}, 错误: {e}") return { "success": False, "host": host, "port": port, "error": str(e) } @staticmethod def scan_ports(host: str, start_port: int = 1, end_port: int = 1024, timeout: int = 1) -> Dict[str, Any]: """扫描端口""" open_ports = [] for port in range(start_port, end_port + 1): if TCPUtils.test_connection(host, port, timeout): open_ports.append(port) logger.info(f"端口扫描完成: {host} -> 发现 {len(open_ports)} 个开放端口") return { "host": host, "start_port": start_port, "end_port": end_port, "open_ports": open_ports, "total_ports": end_port - start_port + 1, "open_count": len(open_ports) }