You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

386 lines
17 KiB

#!/usr/bin/env python3
"""
WebSocket第二次连接事件推送修复功能测试
验证修复后的WebSocket连接能够主动检查设备状态并推送数据
"""
import asyncio
import sys
import os
import time
from unittest.mock import AsyncMock, MagicMock, patch
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.core.websocket.manager import WebSocketManager
from app.core.device.manager import DeviceManager
from app.core.device.event_manager import EventManager
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.DEBUG)
class TestWebSocketSecondConnectionFix:
"""WebSocket第二次连接事件推送修复功能测试类"""
def __init__(self):
self.websocket_manager = None
self.device_manager = None
self.test_results = []
async def setup(self):
"""测试环境初始化"""
logger.info("初始化测试环境...")
# 创建WebSocket管理器实例
self.websocket_manager = WebSocketManager()
# 创建设备管理器实例
self.device_manager = DeviceManager()
# 设置WebSocket推送回调
self.device_manager.set_websocket_push_callback(
self.websocket_manager._websocket_push_adapter()
)
logger.info("测试环境初始化完成")
async def cleanup(self):
"""测试环境清理"""
logger.info("清理测试环境...")
if self.websocket_manager:
await self.websocket_manager.cleanup()
if self.device_manager:
await self.device_manager.cleanup()
logger.info("测试环境清理完成")
async def test_first_connection_with_events(self):
"""测试第一次连接时的事件推送"""
logger.info("=== 测试第一次连接时的事件推送 ===")
try:
# 1. 模拟一些设备事件(在连接之前)
logger.info("步骤1: 模拟设备事件")
test_events = [
("device_001", "device", {"model": "Test Device 1", "manufacturer": "Test Corp"}),
("device_002", "device", {"model": "Test Device 2", "manufacturer": "Test Corp"}),
("device_003", "offline", {"model": "Test Device 3", "manufacturer": "Test Corp"}),
]
for device_id, status, device_info in test_events:
await self.device_manager.handle_auto_discovered_device_event(
device_id, status, device_info
)
await asyncio.sleep(0.1)
# 2. 检查事件缓冲队列
buffered_events = await self.device_manager.get_buffered_events("test_client")
logger.info(f"缓冲事件数量: {len(buffered_events)}")
# 3. 模拟WebSocket客户端连接
logger.info("步骤2: 模拟WebSocket客户端连接")
client_name = "test_client_1"
# Mock WebSocket客户端连接
with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \
patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \
patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \
patch.object(self.websocket_manager, '_create_and_start_send_controller'), \
patch.object(self.websocket_manager, '_create_and_start_receive_processor'):
# 设置设备管理器Mock
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices:
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 2"}
}
]
# 执行连接
success = await self.websocket_manager.connect_client(client_name)
# 等待异步任务完成
await asyncio.sleep(1)
# 验证连接成功
assert success, "第一次连接应该成功"
logger.info("第一次连接成功")
# 验证设备状态检查被调用
mock_get_devices.assert_called()
logger.info("设备状态检查被正确调用")
self.test_results.append(("第一次连接事件推送", "PASS"))
logger.info("✅ 第一次连接事件推送测试通过")
except Exception as e:
logger.error(f"第一次连接事件推送测试失败: {e}")
self.test_results.append(("第一次连接事件推送", f"FAIL: {e}"))
raise
async def test_second_connection_without_new_events(self):
"""测试第二次连接时的事件推送(无新事件)"""
logger.info("=== 测试第二次连接时的事件推送(无新事件) ===")
try:
# 1. 模拟第一次连接后缓冲队列被清空的情况
logger.info("步骤1: 模拟缓冲队列被清空")
# 清空事件缓冲队列(模拟第一次推送后的状态)
self.device_manager._event_manager._event_buffer.clear()
logger.info("事件缓冲队列已清空")
# 2. 模拟第二次WebSocket客户端连接
logger.info("步骤2: 模拟第二次WebSocket客户端连接")
client_name = "test_client_2"
# Mock WebSocket客户端连接
with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \
patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \
patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \
patch.object(self.websocket_manager, '_create_and_start_send_controller'), \
patch.object(self.websocket_manager, '_create_and_start_receive_processor'):
# 设置设备管理器Mock - 返回当前在线的设备
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 2"}
}
]
# 执行连接
success = await self.websocket_manager.connect_client(client_name)
# 等待异步任务完成
await asyncio.sleep(1)
# 验证连接成功
assert success, "第二次连接应该成功"
logger.info("第二次连接成功")
# 验证设备状态检查被调用
mock_get_devices.assert_called()
logger.info("设备状态检查被正确调用")
# 验证为在线设备生成了连接事件
assert mock_handle_event.call_count >= 2, "应该为在线设备生成连接事件"
logger.info(f"{mock_handle_event.call_count} 个设备生成了连接事件")
self.test_results.append(("第二次连接事件推送", "PASS"))
logger.info("✅ 第二次连接事件推送测试通过")
except Exception as e:
logger.error(f"第二次连接事件推送测试失败: {e}")
self.test_results.append(("第二次连接事件推送", f"FAIL: {e}"))
raise
async def test_device_status_check_functionality(self):
"""测试设备状态检查功能"""
logger.info("=== 测试设备状态检查功能 ===")
try:
# 1. 测试设备状态检查方法
logger.info("步骤1: 测试设备状态检查方法")
# Mock设备管理器
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
# 设置Mock返回值
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "offline",
"source": "auto_discovered",
"device_info": {"model": "Test Device 2"}
},
{
"device_id": "device_003",
"status": "device",
"source": "registered", # 已注册设备,应该被跳过
"device_info": {"model": "Test Device 3"}
}
]
# 调用设备状态检查方法
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
# 验证只处理了自动发现的在线设备
mock_handle_event.assert_called_once_with(
device_id="device_001",
status="device",
device_info={"model": "Test Device 1"}
)
logger.info("设备状态检查功能正确,只处理了自动发现的在线设备")
self.test_results.append(("设备状态检查功能", "PASS"))
logger.info("✅ 设备状态检查功能测试通过")
except Exception as e:
logger.error(f"设备状态检查功能测试失败: {e}")
self.test_results.append(("设备状态检查功能", f"FAIL: {e}"))
raise
async def test_error_handling(self):
"""测试错误处理"""
logger.info("=== 测试错误处理 ===")
try:
# 1. 测试设备管理器异常
logger.info("步骤1: 测试设备管理器异常")
with patch.object(self.device_manager, 'get_all_devices_unified', side_effect=Exception("设备管理器异常")):
# 调用设备状态检查方法,应该能处理异常
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
logger.info("设备管理器异常被正确处理")
# 2. 测试空设备列表
logger.info("步骤2: 测试空设备列表")
with patch.object(self.device_manager, 'get_all_devices_unified', return_value=[]):
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
logger.info("空设备列表被正确处理")
self.test_results.append(("错误处理", "PASS"))
logger.info("✅ 错误处理测试通过")
except Exception as e:
logger.error(f"错误处理测试失败: {e}")
self.test_results.append(("错误处理", f"FAIL: {e}"))
raise
async def test_performance(self):
"""测试性能影响"""
logger.info("=== 测试性能影响 ===")
try:
# 1. 测试大量设备的情况
logger.info("步骤1: 测试大量设备的情况")
# 生成大量设备数据
large_device_list = []
for i in range(100):
large_device_list.append({
"device_id": f"device_{i:03d}",
"status": "device" if i % 2 == 0 else "offline",
"source": "auto_discovered",
"device_info": {"model": f"Test Device {i}"}
})
with patch.object(self.device_manager, 'get_all_devices_unified', return_value=large_device_list), \
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
start_time = time.time()
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
end_time = time.time()
duration = end_time - start_time
logger.info(f"处理100个设备耗时: {duration:.3f}")
# 验证只处理了在线设备
expected_calls = len([d for d in large_device_list if d["status"] == "device"])
assert mock_handle_event.call_count == expected_calls, f"应该处理 {expected_calls} 个在线设备"
logger.info(f"正确处理了 {mock_handle_event.call_count} 个在线设备")
self.test_results.append(("性能测试", "PASS"))
logger.info("✅ 性能测试通过")
except Exception as e:
logger.error(f"性能测试失败: {e}")
self.test_results.append(("性能测试", f"FAIL: {e}"))
raise
def print_test_results(self):
"""打印测试结果"""
logger.info("=== 测试结果汇总 ===")
total_tests = len(self.test_results)
passed_tests = len([r for r in self.test_results if r[1] == "PASS"])
failed_tests = total_tests - passed_tests
logger.info(f"总测试数: {total_tests}")
logger.info(f"通过: {passed_tests}")
logger.info(f"失败: {failed_tests}")
logger.info(f"成功率: {(passed_tests/total_tests*100):.1f}%")
logger.info("\n详细结果:")
for test_name, result in self.test_results:
status_icon = "" if result == "PASS" else ""
logger.info(f"{status_icon} {test_name}: {result}")
return passed_tests == total_tests
async def main():
"""主测试函数"""
logger.info("开始WebSocket第二次连接事件推送修复功能测试")
test_suite = TestWebSocketSecondConnectionFix()
try:
# 初始化测试环境
await test_suite.setup()
# 运行测试用例
await test_suite.test_first_connection_with_events()
await test_suite.test_second_connection_without_new_events()
await test_suite.test_device_status_check_functionality()
await test_suite.test_error_handling()
await test_suite.test_performance()
# 打印测试结果
success = test_suite.print_test_results()
if success:
logger.info("🎉 所有测试通过!WebSocket第二次连接事件推送修复功能正常")
else:
logger.error("❌ 部分测试失败,请检查修复实现")
except Exception as e:
logger.error(f"测试执行异常: {e}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
finally:
# 清理测试环境
await test_suite.cleanup()
if __name__ == "__main__":
asyncio.run(main())