#!/usr/bin/env python3 """ WebSocket第二次连接事件推送修复功能测试 验证修复后的WebSocket连接能够主动检查设备状态并推送数据 """ import asyncio import sys import os import time from unittest.mock import AsyncMock, MagicMock, patch # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.core.websocket.manager import WebSocketManager from app.core.device.manager import DeviceManager from app.core.device.event_manager import EventManager from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) class TestWebSocketSecondConnectionFix: """WebSocket第二次连接事件推送修复功能测试类""" def __init__(self): self.websocket_manager = None self.device_manager = None self.test_results = [] async def setup(self): """测试环境初始化""" logger.info("初始化测试环境...") # 创建WebSocket管理器实例 self.websocket_manager = WebSocketManager() # 创建设备管理器实例 self.device_manager = DeviceManager() # 设置WebSocket推送回调 self.device_manager.set_websocket_push_callback( self.websocket_manager._websocket_push_adapter() ) logger.info("测试环境初始化完成") async def cleanup(self): """测试环境清理""" logger.info("清理测试环境...") if self.websocket_manager: await self.websocket_manager.cleanup() if self.device_manager: await self.device_manager.cleanup() logger.info("测试环境清理完成") async def test_first_connection_with_events(self): """测试第一次连接时的事件推送""" logger.info("=== 测试第一次连接时的事件推送 ===") try: # 1. 模拟一些设备事件(在连接之前) logger.info("步骤1: 模拟设备事件") test_events = [ ("device_001", "device", {"model": "Test Device 1", "manufacturer": "Test Corp"}), ("device_002", "device", {"model": "Test Device 2", "manufacturer": "Test Corp"}), ("device_003", "offline", {"model": "Test Device 3", "manufacturer": "Test Corp"}), ] for device_id, status, device_info in test_events: await self.device_manager.handle_auto_discovered_device_event( device_id, status, device_info ) await asyncio.sleep(0.1) # 2. 检查事件缓冲队列 buffered_events = await self.device_manager.get_buffered_events("test_client") logger.info(f"缓冲事件数量: {len(buffered_events)}") # 3. 模拟WebSocket客户端连接 logger.info("步骤2: 模拟WebSocket客户端连接") client_name = "test_client_1" # Mock WebSocket客户端连接 with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \ patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \ patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \ patch.object(self.websocket_manager, '_create_and_start_send_controller'), \ patch.object(self.websocket_manager, '_create_and_start_receive_processor'): # 设置设备管理器Mock with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices: mock_get_devices.return_value = [ { "device_id": "device_001", "status": "device", "source": "auto_discovered", "device_info": {"model": "Test Device 1"} }, { "device_id": "device_002", "status": "device", "source": "auto_discovered", "device_info": {"model": "Test Device 2"} } ] # 执行连接 success = await self.websocket_manager.connect_client(client_name) # 等待异步任务完成 await asyncio.sleep(1) # 验证连接成功 assert success, "第一次连接应该成功" logger.info("第一次连接成功") # 验证设备状态检查被调用 mock_get_devices.assert_called() logger.info("设备状态检查被正确调用") self.test_results.append(("第一次连接事件推送", "PASS")) logger.info("✅ 第一次连接事件推送测试通过") except Exception as e: logger.error(f"第一次连接事件推送测试失败: {e}") self.test_results.append(("第一次连接事件推送", f"FAIL: {e}")) raise async def test_second_connection_without_new_events(self): """测试第二次连接时的事件推送(无新事件)""" logger.info("=== 测试第二次连接时的事件推送(无新事件) ===") try: # 1. 模拟第一次连接后缓冲队列被清空的情况 logger.info("步骤1: 模拟缓冲队列被清空") # 清空事件缓冲队列(模拟第一次推送后的状态) self.device_manager._event_manager._event_buffer.clear() logger.info("事件缓冲队列已清空") # 2. 模拟第二次WebSocket客户端连接 logger.info("步骤2: 模拟第二次WebSocket客户端连接") client_name = "test_client_2" # Mock WebSocket客户端连接 with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \ patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \ patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \ patch.object(self.websocket_manager, '_create_and_start_send_controller'), \ patch.object(self.websocket_manager, '_create_and_start_receive_processor'): # 设置设备管理器Mock - 返回当前在线的设备 with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \ patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event: mock_get_devices.return_value = [ { "device_id": "device_001", "status": "device", "source": "auto_discovered", "device_info": {"model": "Test Device 1"} }, { "device_id": "device_002", "status": "device", "source": "auto_discovered", "device_info": {"model": "Test Device 2"} } ] # 执行连接 success = await self.websocket_manager.connect_client(client_name) # 等待异步任务完成 await asyncio.sleep(1) # 验证连接成功 assert success, "第二次连接应该成功" logger.info("第二次连接成功") # 验证设备状态检查被调用 mock_get_devices.assert_called() logger.info("设备状态检查被正确调用") # 验证为在线设备生成了连接事件 assert mock_handle_event.call_count >= 2, "应该为在线设备生成连接事件" logger.info(f"为 {mock_handle_event.call_count} 个设备生成了连接事件") self.test_results.append(("第二次连接事件推送", "PASS")) logger.info("✅ 第二次连接事件推送测试通过") except Exception as e: logger.error(f"第二次连接事件推送测试失败: {e}") self.test_results.append(("第二次连接事件推送", f"FAIL: {e}")) raise async def test_device_status_check_functionality(self): """测试设备状态检查功能""" logger.info("=== 测试设备状态检查功能 ===") try: # 1. 测试设备状态检查方法 logger.info("步骤1: 测试设备状态检查方法") # Mock设备管理器 with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \ patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event: # 设置Mock返回值 mock_get_devices.return_value = [ { "device_id": "device_001", "status": "device", "source": "auto_discovered", "device_info": {"model": "Test Device 1"} }, { "device_id": "device_002", "status": "offline", "source": "auto_discovered", "device_info": {"model": "Test Device 2"} }, { "device_id": "device_003", "status": "device", "source": "registered", # 已注册设备,应该被跳过 "device_info": {"model": "Test Device 3"} } ] # 调用设备状态检查方法 await self.websocket_manager._async_check_current_device_status( "test_client", self.device_manager ) # 验证只处理了自动发现的在线设备 mock_handle_event.assert_called_once_with( device_id="device_001", status="device", device_info={"model": "Test Device 1"} ) logger.info("设备状态检查功能正确,只处理了自动发现的在线设备") self.test_results.append(("设备状态检查功能", "PASS")) logger.info("✅ 设备状态检查功能测试通过") except Exception as e: logger.error(f"设备状态检查功能测试失败: {e}") self.test_results.append(("设备状态检查功能", f"FAIL: {e}")) raise async def test_error_handling(self): """测试错误处理""" logger.info("=== 测试错误处理 ===") try: # 1. 测试设备管理器异常 logger.info("步骤1: 测试设备管理器异常") with patch.object(self.device_manager, 'get_all_devices_unified', side_effect=Exception("设备管理器异常")): # 调用设备状态检查方法,应该能处理异常 await self.websocket_manager._async_check_current_device_status( "test_client", self.device_manager ) logger.info("设备管理器异常被正确处理") # 2. 测试空设备列表 logger.info("步骤2: 测试空设备列表") with patch.object(self.device_manager, 'get_all_devices_unified', return_value=[]): await self.websocket_manager._async_check_current_device_status( "test_client", self.device_manager ) logger.info("空设备列表被正确处理") self.test_results.append(("错误处理", "PASS")) logger.info("✅ 错误处理测试通过") except Exception as e: logger.error(f"错误处理测试失败: {e}") self.test_results.append(("错误处理", f"FAIL: {e}")) raise async def test_performance(self): """测试性能影响""" logger.info("=== 测试性能影响 ===") try: # 1. 测试大量设备的情况 logger.info("步骤1: 测试大量设备的情况") # 生成大量设备数据 large_device_list = [] for i in range(100): large_device_list.append({ "device_id": f"device_{i:03d}", "status": "device" if i % 2 == 0 else "offline", "source": "auto_discovered", "device_info": {"model": f"Test Device {i}"} }) with patch.object(self.device_manager, 'get_all_devices_unified', return_value=large_device_list), \ patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event: start_time = time.time() await self.websocket_manager._async_check_current_device_status( "test_client", self.device_manager ) end_time = time.time() duration = end_time - start_time logger.info(f"处理100个设备耗时: {duration:.3f}秒") # 验证只处理了在线设备 expected_calls = len([d for d in large_device_list if d["status"] == "device"]) assert mock_handle_event.call_count == expected_calls, f"应该处理 {expected_calls} 个在线设备" logger.info(f"正确处理了 {mock_handle_event.call_count} 个在线设备") self.test_results.append(("性能测试", "PASS")) logger.info("✅ 性能测试通过") except Exception as e: logger.error(f"性能测试失败: {e}") self.test_results.append(("性能测试", f"FAIL: {e}")) raise def print_test_results(self): """打印测试结果""" logger.info("=== 测试结果汇总 ===") total_tests = len(self.test_results) passed_tests = len([r for r in self.test_results if r[1] == "PASS"]) failed_tests = total_tests - passed_tests logger.info(f"总测试数: {total_tests}") logger.info(f"通过: {passed_tests}") logger.info(f"失败: {failed_tests}") logger.info(f"成功率: {(passed_tests/total_tests*100):.1f}%") logger.info("\n详细结果:") for test_name, result in self.test_results: status_icon = "✅" if result == "PASS" else "❌" logger.info(f"{status_icon} {test_name}: {result}") return passed_tests == total_tests async def main(): """主测试函数""" logger.info("开始WebSocket第二次连接事件推送修复功能测试") test_suite = TestWebSocketSecondConnectionFix() try: # 初始化测试环境 await test_suite.setup() # 运行测试用例 await test_suite.test_first_connection_with_events() await test_suite.test_second_connection_without_new_events() await test_suite.test_device_status_check_functionality() await test_suite.test_error_handling() await test_suite.test_performance() # 打印测试结果 success = test_suite.print_test_results() if success: logger.info("🎉 所有测试通过!WebSocket第二次连接事件推送修复功能正常") else: logger.error("❌ 部分测试失败,请检查修复实现") except Exception as e: logger.error(f"测试执行异常: {e}") import traceback logger.error(f"异常堆栈: {traceback.format_exc()}") finally: # 清理测试环境 await test_suite.cleanup() if __name__ == "__main__": asyncio.run(main())