You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
386 lines
17 KiB
386 lines
17 KiB
|
3 months ago
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
WebSocket第二次连接事件推送修复功能测试
|
||
|
|
验证修复后的WebSocket连接能够主动检查设备状态并推送数据
|
||
|
|
"""
|
||
|
|
import asyncio
|
||
|
|
import sys
|
||
|
|
import os
|
||
|
|
import time
|
||
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
|
|
||
|
|
# 添加项目根目录到Python路径
|
||
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
|
|
||
|
|
from app.core.websocket.manager import WebSocketManager
|
||
|
|
from app.core.device.manager import DeviceManager
|
||
|
|
from app.core.device.event_manager import EventManager
|
||
|
|
from app.utils.structured_log import get_structured_logger, LogLevel
|
||
|
|
|
||
|
|
logger = get_structured_logger(__name__, LogLevel.DEBUG)
|
||
|
|
|
||
|
|
class TestWebSocketSecondConnectionFix:
|
||
|
|
"""WebSocket第二次连接事件推送修复功能测试类"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.websocket_manager = None
|
||
|
|
self.device_manager = None
|
||
|
|
self.test_results = []
|
||
|
|
|
||
|
|
async def setup(self):
|
||
|
|
"""测试环境初始化"""
|
||
|
|
logger.info("初始化测试环境...")
|
||
|
|
|
||
|
|
# 创建WebSocket管理器实例
|
||
|
|
self.websocket_manager = WebSocketManager()
|
||
|
|
|
||
|
|
# 创建设备管理器实例
|
||
|
|
self.device_manager = DeviceManager()
|
||
|
|
|
||
|
|
# 设置WebSocket推送回调
|
||
|
|
self.device_manager.set_websocket_push_callback(
|
||
|
|
self.websocket_manager._websocket_push_adapter()
|
||
|
|
)
|
||
|
|
|
||
|
|
logger.info("测试环境初始化完成")
|
||
|
|
|
||
|
|
async def cleanup(self):
|
||
|
|
"""测试环境清理"""
|
||
|
|
logger.info("清理测试环境...")
|
||
|
|
|
||
|
|
if self.websocket_manager:
|
||
|
|
await self.websocket_manager.cleanup()
|
||
|
|
|
||
|
|
if self.device_manager:
|
||
|
|
await self.device_manager.cleanup()
|
||
|
|
|
||
|
|
logger.info("测试环境清理完成")
|
||
|
|
|
||
|
|
async def test_first_connection_with_events(self):
|
||
|
|
"""测试第一次连接时的事件推送"""
|
||
|
|
logger.info("=== 测试第一次连接时的事件推送 ===")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 1. 模拟一些设备事件(在连接之前)
|
||
|
|
logger.info("步骤1: 模拟设备事件")
|
||
|
|
test_events = [
|
||
|
|
("device_001", "device", {"model": "Test Device 1", "manufacturer": "Test Corp"}),
|
||
|
|
("device_002", "device", {"model": "Test Device 2", "manufacturer": "Test Corp"}),
|
||
|
|
("device_003", "offline", {"model": "Test Device 3", "manufacturer": "Test Corp"}),
|
||
|
|
]
|
||
|
|
|
||
|
|
for device_id, status, device_info in test_events:
|
||
|
|
await self.device_manager.handle_auto_discovered_device_event(
|
||
|
|
device_id, status, device_info
|
||
|
|
)
|
||
|
|
await asyncio.sleep(0.1)
|
||
|
|
|
||
|
|
# 2. 检查事件缓冲队列
|
||
|
|
buffered_events = await self.device_manager.get_buffered_events("test_client")
|
||
|
|
logger.info(f"缓冲事件数量: {len(buffered_events)}")
|
||
|
|
|
||
|
|
# 3. 模拟WebSocket客户端连接
|
||
|
|
logger.info("步骤2: 模拟WebSocket客户端连接")
|
||
|
|
client_name = "test_client_1"
|
||
|
|
|
||
|
|
# Mock WebSocket客户端连接
|
||
|
|
with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \
|
||
|
|
patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \
|
||
|
|
patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \
|
||
|
|
patch.object(self.websocket_manager, '_create_and_start_send_controller'), \
|
||
|
|
patch.object(self.websocket_manager, '_create_and_start_receive_processor'):
|
||
|
|
|
||
|
|
# 设置设备管理器Mock
|
||
|
|
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices:
|
||
|
|
mock_get_devices.return_value = [
|
||
|
|
{
|
||
|
|
"device_id": "device_001",
|
||
|
|
"status": "device",
|
||
|
|
"source": "auto_discovered",
|
||
|
|
"device_info": {"model": "Test Device 1"}
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"device_id": "device_002",
|
||
|
|
"status": "device",
|
||
|
|
"source": "auto_discovered",
|
||
|
|
"device_info": {"model": "Test Device 2"}
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
# 执行连接
|
||
|
|
success = await self.websocket_manager.connect_client(client_name)
|
||
|
|
|
||
|
|
# 等待异步任务完成
|
||
|
|
await asyncio.sleep(1)
|
||
|
|
|
||
|
|
# 验证连接成功
|
||
|
|
assert success, "第一次连接应该成功"
|
||
|
|
logger.info("第一次连接成功")
|
||
|
|
|
||
|
|
# 验证设备状态检查被调用
|
||
|
|
mock_get_devices.assert_called()
|
||
|
|
logger.info("设备状态检查被正确调用")
|
||
|
|
|
||
|
|
self.test_results.append(("第一次连接事件推送", "PASS"))
|
||
|
|
logger.info("✅ 第一次连接事件推送测试通过")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"第一次连接事件推送测试失败: {e}")
|
||
|
|
self.test_results.append(("第一次连接事件推送", f"FAIL: {e}"))
|
||
|
|
raise
|
||
|
|
|
||
|
|
async def test_second_connection_without_new_events(self):
|
||
|
|
"""测试第二次连接时的事件推送(无新事件)"""
|
||
|
|
logger.info("=== 测试第二次连接时的事件推送(无新事件) ===")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 1. 模拟第一次连接后缓冲队列被清空的情况
|
||
|
|
logger.info("步骤1: 模拟缓冲队列被清空")
|
||
|
|
|
||
|
|
# 清空事件缓冲队列(模拟第一次推送后的状态)
|
||
|
|
self.device_manager._event_manager._event_buffer.clear()
|
||
|
|
logger.info("事件缓冲队列已清空")
|
||
|
|
|
||
|
|
# 2. 模拟第二次WebSocket客户端连接
|
||
|
|
logger.info("步骤2: 模拟第二次WebSocket客户端连接")
|
||
|
|
client_name = "test_client_2"
|
||
|
|
|
||
|
|
# Mock WebSocket客户端连接
|
||
|
|
with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \
|
||
|
|
patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \
|
||
|
|
patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \
|
||
|
|
patch.object(self.websocket_manager, '_create_and_start_send_controller'), \
|
||
|
|
patch.object(self.websocket_manager, '_create_and_start_receive_processor'):
|
||
|
|
|
||
|
|
# 设置设备管理器Mock - 返回当前在线的设备
|
||
|
|
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \
|
||
|
|
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
|
||
|
|
|
||
|
|
mock_get_devices.return_value = [
|
||
|
|
{
|
||
|
|
"device_id": "device_001",
|
||
|
|
"status": "device",
|
||
|
|
"source": "auto_discovered",
|
||
|
|
"device_info": {"model": "Test Device 1"}
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"device_id": "device_002",
|
||
|
|
"status": "device",
|
||
|
|
"source": "auto_discovered",
|
||
|
|
"device_info": {"model": "Test Device 2"}
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
# 执行连接
|
||
|
|
success = await self.websocket_manager.connect_client(client_name)
|
||
|
|
|
||
|
|
# 等待异步任务完成
|
||
|
|
await asyncio.sleep(1)
|
||
|
|
|
||
|
|
# 验证连接成功
|
||
|
|
assert success, "第二次连接应该成功"
|
||
|
|
logger.info("第二次连接成功")
|
||
|
|
|
||
|
|
# 验证设备状态检查被调用
|
||
|
|
mock_get_devices.assert_called()
|
||
|
|
logger.info("设备状态检查被正确调用")
|
||
|
|
|
||
|
|
# 验证为在线设备生成了连接事件
|
||
|
|
assert mock_handle_event.call_count >= 2, "应该为在线设备生成连接事件"
|
||
|
|
logger.info(f"为 {mock_handle_event.call_count} 个设备生成了连接事件")
|
||
|
|
|
||
|
|
self.test_results.append(("第二次连接事件推送", "PASS"))
|
||
|
|
logger.info("✅ 第二次连接事件推送测试通过")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"第二次连接事件推送测试失败: {e}")
|
||
|
|
self.test_results.append(("第二次连接事件推送", f"FAIL: {e}"))
|
||
|
|
raise
|
||
|
|
|
||
|
|
async def test_device_status_check_functionality(self):
|
||
|
|
"""测试设备状态检查功能"""
|
||
|
|
logger.info("=== 测试设备状态检查功能 ===")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 1. 测试设备状态检查方法
|
||
|
|
logger.info("步骤1: 测试设备状态检查方法")
|
||
|
|
|
||
|
|
# Mock设备管理器
|
||
|
|
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \
|
||
|
|
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
|
||
|
|
|
||
|
|
# 设置Mock返回值
|
||
|
|
mock_get_devices.return_value = [
|
||
|
|
{
|
||
|
|
"device_id": "device_001",
|
||
|
|
"status": "device",
|
||
|
|
"source": "auto_discovered",
|
||
|
|
"device_info": {"model": "Test Device 1"}
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"device_id": "device_002",
|
||
|
|
"status": "offline",
|
||
|
|
"source": "auto_discovered",
|
||
|
|
"device_info": {"model": "Test Device 2"}
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"device_id": "device_003",
|
||
|
|
"status": "device",
|
||
|
|
"source": "registered", # 已注册设备,应该被跳过
|
||
|
|
"device_info": {"model": "Test Device 3"}
|
||
|
|
}
|
||
|
|
]
|
||
|
|
|
||
|
|
# 调用设备状态检查方法
|
||
|
|
await self.websocket_manager._async_check_current_device_status(
|
||
|
|
"test_client", self.device_manager
|
||
|
|
)
|
||
|
|
|
||
|
|
# 验证只处理了自动发现的在线设备
|
||
|
|
mock_handle_event.assert_called_once_with(
|
||
|
|
device_id="device_001",
|
||
|
|
status="device",
|
||
|
|
device_info={"model": "Test Device 1"}
|
||
|
|
)
|
||
|
|
logger.info("设备状态检查功能正确,只处理了自动发现的在线设备")
|
||
|
|
|
||
|
|
self.test_results.append(("设备状态检查功能", "PASS"))
|
||
|
|
logger.info("✅ 设备状态检查功能测试通过")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"设备状态检查功能测试失败: {e}")
|
||
|
|
self.test_results.append(("设备状态检查功能", f"FAIL: {e}"))
|
||
|
|
raise
|
||
|
|
|
||
|
|
async def test_error_handling(self):
|
||
|
|
"""测试错误处理"""
|
||
|
|
logger.info("=== 测试错误处理 ===")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 1. 测试设备管理器异常
|
||
|
|
logger.info("步骤1: 测试设备管理器异常")
|
||
|
|
|
||
|
|
with patch.object(self.device_manager, 'get_all_devices_unified', side_effect=Exception("设备管理器异常")):
|
||
|
|
# 调用设备状态检查方法,应该能处理异常
|
||
|
|
await self.websocket_manager._async_check_current_device_status(
|
||
|
|
"test_client", self.device_manager
|
||
|
|
)
|
||
|
|
logger.info("设备管理器异常被正确处理")
|
||
|
|
|
||
|
|
# 2. 测试空设备列表
|
||
|
|
logger.info("步骤2: 测试空设备列表")
|
||
|
|
|
||
|
|
with patch.object(self.device_manager, 'get_all_devices_unified', return_value=[]):
|
||
|
|
await self.websocket_manager._async_check_current_device_status(
|
||
|
|
"test_client", self.device_manager
|
||
|
|
)
|
||
|
|
logger.info("空设备列表被正确处理")
|
||
|
|
|
||
|
|
self.test_results.append(("错误处理", "PASS"))
|
||
|
|
logger.info("✅ 错误处理测试通过")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"错误处理测试失败: {e}")
|
||
|
|
self.test_results.append(("错误处理", f"FAIL: {e}"))
|
||
|
|
raise
|
||
|
|
|
||
|
|
async def test_performance(self):
|
||
|
|
"""测试性能影响"""
|
||
|
|
logger.info("=== 测试性能影响 ===")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 1. 测试大量设备的情况
|
||
|
|
logger.info("步骤1: 测试大量设备的情况")
|
||
|
|
|
||
|
|
# 生成大量设备数据
|
||
|
|
large_device_list = []
|
||
|
|
for i in range(100):
|
||
|
|
large_device_list.append({
|
||
|
|
"device_id": f"device_{i:03d}",
|
||
|
|
"status": "device" if i % 2 == 0 else "offline",
|
||
|
|
"source": "auto_discovered",
|
||
|
|
"device_info": {"model": f"Test Device {i}"}
|
||
|
|
})
|
||
|
|
|
||
|
|
with patch.object(self.device_manager, 'get_all_devices_unified', return_value=large_device_list), \
|
||
|
|
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
|
||
|
|
|
||
|
|
start_time = time.time()
|
||
|
|
await self.websocket_manager._async_check_current_device_status(
|
||
|
|
"test_client", self.device_manager
|
||
|
|
)
|
||
|
|
end_time = time.time()
|
||
|
|
|
||
|
|
duration = end_time - start_time
|
||
|
|
logger.info(f"处理100个设备耗时: {duration:.3f}秒")
|
||
|
|
|
||
|
|
# 验证只处理了在线设备
|
||
|
|
expected_calls = len([d for d in large_device_list if d["status"] == "device"])
|
||
|
|
assert mock_handle_event.call_count == expected_calls, f"应该处理 {expected_calls} 个在线设备"
|
||
|
|
logger.info(f"正确处理了 {mock_handle_event.call_count} 个在线设备")
|
||
|
|
|
||
|
|
self.test_results.append(("性能测试", "PASS"))
|
||
|
|
logger.info("✅ 性能测试通过")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"性能测试失败: {e}")
|
||
|
|
self.test_results.append(("性能测试", f"FAIL: {e}"))
|
||
|
|
raise
|
||
|
|
|
||
|
|
def print_test_results(self):
|
||
|
|
"""打印测试结果"""
|
||
|
|
logger.info("=== 测试结果汇总 ===")
|
||
|
|
|
||
|
|
total_tests = len(self.test_results)
|
||
|
|
passed_tests = len([r for r in self.test_results if r[1] == "PASS"])
|
||
|
|
failed_tests = total_tests - passed_tests
|
||
|
|
|
||
|
|
logger.info(f"总测试数: {total_tests}")
|
||
|
|
logger.info(f"通过: {passed_tests}")
|
||
|
|
logger.info(f"失败: {failed_tests}")
|
||
|
|
logger.info(f"成功率: {(passed_tests/total_tests*100):.1f}%")
|
||
|
|
|
||
|
|
logger.info("\n详细结果:")
|
||
|
|
for test_name, result in self.test_results:
|
||
|
|
status_icon = "✅" if result == "PASS" else "❌"
|
||
|
|
logger.info(f"{status_icon} {test_name}: {result}")
|
||
|
|
|
||
|
|
return passed_tests == total_tests
|
||
|
|
|
||
|
|
async def main():
|
||
|
|
"""主测试函数"""
|
||
|
|
logger.info("开始WebSocket第二次连接事件推送修复功能测试")
|
||
|
|
|
||
|
|
test_suite = TestWebSocketSecondConnectionFix()
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 初始化测试环境
|
||
|
|
await test_suite.setup()
|
||
|
|
|
||
|
|
# 运行测试用例
|
||
|
|
await test_suite.test_first_connection_with_events()
|
||
|
|
await test_suite.test_second_connection_without_new_events()
|
||
|
|
await test_suite.test_device_status_check_functionality()
|
||
|
|
await test_suite.test_error_handling()
|
||
|
|
await test_suite.test_performance()
|
||
|
|
|
||
|
|
# 打印测试结果
|
||
|
|
success = test_suite.print_test_results()
|
||
|
|
|
||
|
|
if success:
|
||
|
|
logger.info("🎉 所有测试通过!WebSocket第二次连接事件推送修复功能正常")
|
||
|
|
else:
|
||
|
|
logger.error("❌ 部分测试失败,请检查修复实现")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"测试执行异常: {e}")
|
||
|
|
import traceback
|
||
|
|
logger.error(f"异常堆栈: {traceback.format_exc()}")
|
||
|
|
|
||
|
|
finally:
|
||
|
|
# 清理测试环境
|
||
|
|
await test_suite.cleanup()
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
asyncio.run(main())
|