You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
4.7 KiB

import socket
import time
from typing import Dict, Any, Optional
from app.utils.log import get_logger
logger = get_logger(__name__)
class TCPUtils:
"""TCP工具类"""
@staticmethod
def test_connection(host: str, port: int, timeout: int = 5) -> bool:
"""测试TCP连接"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
success = result == 0
if success:
logger.info(f"TCP连接测试成功: {host}:{port}")
else:
logger.warning(f"TCP连接测试失败: {host}:{port}")
return success
except Exception as e:
logger.error(f"TCP连接测试异常: {host}:{port}, 错误: {e}")
return False
@staticmethod
def send_data(host: str, port: int, data: bytes, timeout: int = 30,
wait_response: bool = True) -> Dict[str, Any]:
"""发送TCP数据"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((host, port))
# 发送数据
sock.send(data)
response = b""
if wait_response:
# 接收响应
response = sock.recv(1024)
sock.close()
logger.info(f"TCP数据发送成功: {host}:{port} -> {len(data)} bytes")
return {
"success": True,
"host": host,
"port": port,
"sent_data": data.hex().upper(),
"received_data": response.hex().upper() if response else "",
"sent_size": len(data),
"received_size": len(response)
}
except Exception as e:
logger.error(f"TCP数据发送失败: {host}:{port}, 错误: {e}")
return {
"success": False,
"host": host,
"port": port,
"sent_data": data.hex().upper() if isinstance(data, bytes) else str(data),
"received_data": "",
"error": str(e)
}
@staticmethod
def send_hex_data(host: str, port: int, hex_data: str, timeout: int = 30,
wait_response: bool = True) -> Dict[str, Any]:
"""发送十六进制数据"""
try:
# 将十六进制字符串转换为字节
data = bytes.fromhex(hex_data.replace(' ', ''))
return TCPUtils.send_data(host, port, data, timeout, wait_response)
except ValueError as e:
logger.error(f"无效的十六进制数据: {hex_data}, 错误: {e}")
return {
"success": False,
"host": host,
"port": port,
"sent_data": hex_data,
"received_data": "",
"error": f"无效的十六进制数据: {str(e)}"
}
@staticmethod
def create_server(host: str, port: int, callback=None) -> Dict[str, Any]:
"""创建TCP服务器"""
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
logger.info(f"TCP服务器启动成功: {host}:{port}")
return {
"success": True,
"host": host,
"port": port,
"socket": server_socket
}
except Exception as e:
logger.error(f"TCP服务器启动失败: {host}:{port}, 错误: {e}")
return {
"success": False,
"host": host,
"port": port,
"error": str(e)
}
@staticmethod
def scan_ports(host: str, start_port: int = 1, end_port: int = 1024,
timeout: int = 1) -> Dict[str, Any]:
"""扫描端口"""
open_ports = []
for port in range(start_port, end_port + 1):
if TCPUtils.test_connection(host, port, timeout):
open_ports.append(port)
logger.info(f"端口扫描完成: {host} -> 发现 {len(open_ports)} 个开放端口")
return {
"host": host,
"start_port": start_port,
"end_port": end_port,
"open_ports": open_ports,
"total_ports": end_port - start_port + 1,
"open_count": len(open_ports)
}