You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
9.4 KiB

#!/usr/bin/env python3
"""
输入输出日志记录测试
演示各种输入输出操作的日志记录
"""
import asyncio
import json
from app.utils.structured_log import get_structured_logger, LogLevel
from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService
from app.services.ssh_service import SSHService
# 创建日志记录器
logger = get_structured_logger("InputOutputTest", LogLevel.DEBUG)
async def test_adb_input_output():
"""测试ADB输入输出日志"""
logger.info("开始测试ADB输入输出日志")
# 创建ADB服务
adb_service = AutoDiscoveryAdbService()
# 模拟设备ID
device_id = "test_device_001"
try:
# 测试文本输入
logger.info("测试文本输入操作", device_id=device_id, text="Hello World")
# 模拟输入文本
result = await adb_service.input_text(device_id, "Hello World", clear_first=True)
logger.info("文本输入结果",
device_id=device_id,
success=result.success,
output_length=len(result.output) if result.output else 0)
# 测试点击操作
logger.info("测试点击操作", device_id=device_id, x=100, y=200)
# 模拟点击
click_result = await adb_service.click_device(device_id, 100, 200)
logger.info("点击操作结果",
device_id=device_id,
success=click_result.success,
coordinates={"x": 100, "y": 200})
# 测试截图操作
logger.info("测试截图操作", device_id=device_id)
# 模拟截图
screenshot_result = await adb_service.screenshot(device_id)
logger.info("截图操作结果",
device_id=device_id,
success=screenshot_result.success,
output_size=len(screenshot_result.output) if screenshot_result.output else 0)
# 测试日志获取
logger.info("测试日志获取", device_id=device_id, package_name="com.example.app")
# 模拟获取日志
logcat_result = await adb_service.get_logcat(device_id, "com.example.app", "D", 50)
logger.info("日志获取结果",
device_id=device_id,
success=logcat_result.success,
log_lines=len(logcat_result.output.splitlines()) if logcat_result.output else 0)
except Exception as e:
logger.error("ADB输入输出测试失败", device_id=device_id, error=str(e))
logger.info("ADB输入输出测试完成")
async def test_ssh_input_output():
"""测试SSH输入输出日志"""
logger.info("开始测试SSH输入输出日志")
# 创建SSH服务
ssh_service = SSHService()
# 模拟设备ID
device_id = "ssh_device_001"
try:
# 测试SSH命令执行
logger.info("测试SSH命令执行", device_id=device_id, command="ls -la")
# 模拟SSH连接配置
ssh_config = {
"hostname": "192.168.1.100",
"port": 22,
"username": "testuser",
"password": "testpass"
}
# 模拟命令执行
exec_result = await ssh_service.execute_command(
device_id=device_id,
command="ls -la",
working_directory="/home/testuser",
timeout=30
)
logger.info("SSH命令执行结果",
device_id=device_id,
success=exec_result.success,
exit_code=exec_result.exit_code,
stdout_length=len(exec_result.stdout),
stderr_length=len(exec_result.stderr),
execution_time=exec_result.execution_time)
# 测试文件上传
logger.info("测试文件上传", device_id=device_id, local_path="/tmp/test.txt", remote_path="/home/testuser/test.txt")
# 模拟文件上传
upload_result = await ssh_service.upload_file(device_id, "/tmp/test.txt", "/home/testuser/test.txt")
logger.info("文件上传结果",
device_id=device_id,
success=upload_result.get("success", False),
message=upload_result.get("message", ""))
# 测试文件下载
logger.info("测试文件下载", device_id=device_id, remote_path="/home/testuser/output.txt", local_path="/tmp/downloaded.txt")
# 模拟文件下载
download_result = await ssh_service.download_file(device_id, "/home/testuser/output.txt", "/tmp/downloaded.txt")
logger.info("文件下载结果",
device_id=device_id,
success=download_result.get("success", False),
message=download_result.get("message", ""))
except Exception as e:
logger.error("SSH输入输出测试失败", device_id=device_id, error=str(e))
logger.info("SSH输入输出测试完成")
async def test_serial_input_output():
"""测试串口输入输出日志"""
logger.info("开始测试串口输入输出日志")
try:
from app.utils.serial_utils import SerialManager
# 创建串口管理器
serial_manager = SerialManager()
# 模拟串口配置
port_config = {
"port": "COM3",
"baudrate": 115200,
"timeout": 1
}
logger.info("测试串口连接", port_config=port_config)
# 模拟串口操作
try:
# 模拟发送命令
command = "AT+CGSN"
logger.info("发送串口命令", command=command, port=port_config["port"])
# 模拟接收响应
response = "123456789012345"
logger.info("接收串口响应",
command=command,
response=response,
response_length=len(response),
port=port_config["port"])
# 模拟错误情况
error_command = "AT+INVALID"
logger.warning("发送无效命令", command=error_command, port=port_config["port"])
error_response = "ERROR"
logger.warning("接收错误响应",
command=error_command,
response=error_response,
port=port_config["port"])
except Exception as e:
logger.error("串口操作失败", port=port_config["port"], error=str(e))
except ImportError:
logger.warning("串口模块不可用,跳过串口测试")
except Exception as e:
logger.error("串口输入输出测试失败", error=str(e))
logger.info("串口输入输出测试完成")
async def test_websocket_input_output():
"""测试WebSocket输入输出日志"""
logger.info("开始测试WebSocket输入输出日志")
try:
from app.core.websocket.client import WebSocketClient
# 模拟WebSocket客户端
client_name = "test_websocket_client"
logger.info("创建WebSocket客户端", client_name=client_name)
# 模拟WebSocket消息
message = {
"type": "command",
"data": "test_message",
"timestamp": "2025-08-12T09:45:00Z"
}
logger.info("发送WebSocket消息",
client_name=client_name,
message_type=message["type"],
message_data=message["data"])
# 模拟接收响应
response = {
"type": "response",
"data": "message_received",
"timestamp": "2025-08-12T09:45:01Z"
}
logger.info("接收WebSocket响应",
client_name=client_name,
response_type=response["type"],
response_data=response["data"])
# 模拟连接状态变化
logger.info("WebSocket连接状态", client_name=client_name, status="connected")
# 模拟心跳消息
heartbeat_msg = {"type": "heartbeat", "timestamp": "2025-08-12T09:45:02Z"}
logger.debug("发送心跳消息", client_name=client_name, heartbeat_data=heartbeat_msg)
except ImportError:
logger.warning("WebSocket模块不可用,跳过WebSocket测试")
except Exception as e:
logger.error("WebSocket输入输出测试失败", error=str(e))
logger.info("WebSocket输入输出测试完成")
async def main():
"""主测试函数"""
logger.info("开始输入输出日志记录测试")
# 设置请求上下文
from app.utils.structured_log import set_request_context
set_request_context("test_request_001", "test_user", "test_session")
try:
# 测试各种输入输出操作
await test_adb_input_output()
await test_ssh_input_output()
await test_serial_input_output()
await test_websocket_input_output()
logger.info("所有输入输出日志记录测试完成")
except Exception as e:
logger.error("输入输出日志记录测试失败", error=str(e))
finally:
# 清除上下文
from app.utils.structured_log import clear_log_context
clear_log_context()
if __name__ == "__main__":
# 运行测试
asyncio.run(main())