From 9597d4988763f20dc722a12fb296327b1a7f1208 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 18 Aug 2025 15:55:16 +0800 Subject: [PATCH] =?UTF-8?q?test=E8=84=9A=E6=9C=AC=E6=94=BE=E5=9C=A8?= =?UTF-8?q?=E6=8C=87=E5=AE=9A=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vs/VSWorkspaceState.json | 6 +- tatus | 3 - test_device_event_buffer.py | 175 ------------- .../run_websocket_test.py | 0 .../test_app_startup.py | 0 .../test_auto_logging.py | 0 .../test_compatibility.py | 0 .../test_complete_event_push.py | 0 .../test_device_disconnect_fix.py | 0 tests/test_device_event_buffer.py | 241 +++++++++++------- .../test_device_info_fix.py | 0 .../test_device_properties_fix.py | 0 .../test_heartbeat_queue_fix.py | 0 .../test_message_flow.py | 0 .../test_message_format_assembly.py | 0 .../test_message_format_simple.py | 0 .../test_pretty_json_log.py | 0 .../test_refactored_device_manager.py | 0 test_server.py => tests/test_server.py | 0 .../test_simple_event_buffer.py | 0 .../test_system_api.py | 0 .../test_system_api_endpoints.py | 0 .../test_terminal_event_type.py | 0 .../test_websocket_connection_speed.py | 0 24 files changed, 146 insertions(+), 279 deletions(-) delete mode 100644 tatus delete mode 100644 test_device_event_buffer.py rename run_websocket_test.py => tests/run_websocket_test.py (100%) rename test_app_startup.py => tests/test_app_startup.py (100%) rename test_auto_logging.py => tests/test_auto_logging.py (100%) rename test_compatibility.py => tests/test_compatibility.py (100%) rename test_complete_event_push.py => tests/test_complete_event_push.py (100%) rename test_device_disconnect_fix.py => tests/test_device_disconnect_fix.py (100%) rename test_device_info_fix.py => tests/test_device_info_fix.py (100%) rename test_device_properties_fix.py => tests/test_device_properties_fix.py (100%) rename test_heartbeat_queue_fix.py => tests/test_heartbeat_queue_fix.py (100%) rename test_message_flow.py => tests/test_message_flow.py (100%) rename test_message_format_assembly.py => tests/test_message_format_assembly.py (100%) rename test_message_format_simple.py => tests/test_message_format_simple.py (100%) rename test_pretty_json_log.py => tests/test_pretty_json_log.py (100%) rename test_refactored_device_manager.py => tests/test_refactored_device_manager.py (100%) rename test_server.py => tests/test_server.py (100%) rename test_simple_event_buffer.py => tests/test_simple_event_buffer.py (100%) rename test_system_api.py => tests/test_system_api.py (100%) rename test_system_api_endpoints.py => tests/test_system_api_endpoints.py (100%) rename test_terminal_event_type.py => tests/test_terminal_event_type.py (100%) rename test_websocket_connection_speed.py => tests/test_websocket_connection_speed.py (100%) diff --git a/.vs/VSWorkspaceState.json b/.vs/VSWorkspaceState.json index a946c95..be15022 100644 --- a/.vs/VSWorkspaceState.json +++ b/.vs/VSWorkspaceState.json @@ -2,10 +2,12 @@ "ExpandedNodes": [ "", "\\app", - "\\app\\core", + "\\app\\api", + "\\app\\api\\v1", + "\\app\\api\\v1\\endpoints", "\\app\\core\\device", "\\app\\utils" ], - "SelectedNode": "\\app\\core\\device\\manager.py", + "SelectedNode": "\\app\\api\\v1\\endpoints\\devices.py", "PreviewInSolutionExplorer": false } \ No newline at end of file diff --git a/tatus b/tatus deleted file mode 100644 index fa2fd11..0000000 --- a/tatus +++ /dev/null @@ -1,3 +0,0 @@ -edb145c (HEAD -> hotfix/redundant-cleanup) 修复WebSocket连接阻塞问题并优化系统架构 -3048a01 (origin/hotfix/redundant-cleanup) 修复 bug 问题 -78f8b05 重构websocekt 代码 diff --git a/test_device_event_buffer.py b/test_device_event_buffer.py deleted file mode 100644 index 83762a9..0000000 --- a/test_device_event_buffer.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env python3 -""" -设备事件缓冲和WebSocket推送功能测试 -验证在用户客户端连接WebSocket时能够推送之前的监听数据 -""" -import asyncio -import json -import time -from datetime import datetime -from app.core.device.manager import device_manager -from app.core.websocket.manager import websocket_manager -from app.utils.structured_log import get_structured_logger, LogLevel - -logger = get_structured_logger(__name__, LogLevel.DEBUG) - -async def test_device_event_buffer(): - """测试设备事件缓冲和WebSocket推送功能""" - logger.info("开始设备事件缓冲和WebSocket推送功能测试") - - try: - # 1. 启动设备事件推送服务 - logger.info("步骤1: 启动设备事件推送服务") - await device_manager.start_event_pusher() - logger.info("设备事件推送服务启动成功") - - # 2. 模拟一些设备事件(在WebSocket客户端连接之前) - logger.info("步骤2: 模拟设备事件(在WebSocket客户端连接之前)") - test_events = [ - ("device_001", "device", {"model": "Test Device 1", "manufacturer": "Test Corp"}), - ("device_002", "device", {"model": "Test Device 2", "manufacturer": "Test Corp"}), - ("device_003", "offline", {"model": "Test Device 3", "manufacturer": "Test Corp"}), - ("device_001", "offline", {"model": "Test Device 1", "manufacturer": "Test Corp"}), - ] - - for device_id, status, device_info in test_events: - logger.info(f"模拟设备事件: {device_id} - {status}") - await device_manager.handle_auto_discovered_device_event(device_id, status, device_info) - await asyncio.sleep(0.5) # 短暂延迟,模拟真实场景 - - # 3. 检查缓冲的事件 - logger.info("步骤3: 检查缓冲的事件") - # 使用公共API获取缓冲事件,而不是直接访问内部属性 - buffered_events = await device_manager.get_buffered_events("test_check") - buffer_size = len(buffered_events) - logger.info(f"当前缓冲事件数量: {buffer_size}") - - if buffer_size > 0: - event_types = {} - for event in buffered_events: - event_type = event.get('type', 'unknown') - event_types[event_type] = event_types.get(event_type, 0) + 1 - logger.info(f"缓冲事件类型分布: {event_types}") - - # 4. 创建WebSocket客户端 - logger.info("步骤4: 创建WebSocket客户端") - client_name = f"test_client_{int(time.time())}" - test_url = "ws://localhost:8080/ws" # 使用一个测试URL - - try: - # 创建客户端(这里只是测试,不实际连接) - client = await websocket_manager.create_client(client_name, test_url, heartbeat_interval=30) - logger.info(f"WebSocket客户端创建成功: {client_name}") - - # 5. 模拟客户端连接(注册设备事件推送) - logger.info("步骤5: 模拟客户端连接,注册设备事件推送") - await device_manager.register_websocket_client(client_name) - logger.info(f"客户端 {client_name} 已注册设备事件推送") - - # 6. 获取缓冲的事件 - logger.info("步骤6: 获取缓冲的事件") - buffered_events = await device_manager.get_buffered_events(client_name) - logger.info(f"获取到 {len(buffered_events)} 个缓冲事件") - - # 7. 验证事件内容 - logger.info("步骤7: 验证事件内容") - for i, event in enumerate(buffered_events): - logger.info(f"事件 {i+1}: {event.get('type')} - {event.get('device_id')} - {event.get('status')}") - logger.info(f" 时间戳: {event.get('timestamp')}") - logger.info(f" 设备信息: {event.get('device_info')}") - - # 8. 模拟推送事件到WebSocket客户端 - logger.info("步骤8: 模拟推送事件到WebSocket客户端") - success_count = 0 - for event in buffered_events: - try: - # 这里只是模拟,实际推送需要真实的WebSocket连接 - logger.info(f"模拟推送事件: {event.get('type')} - {event.get('device_id')}") - success_count += 1 - except Exception as e: - logger.error(f"推送事件失败: {e}") - - logger.info(f"模拟推送完成: {success_count}/{len(buffered_events)} 个事件") - - # 9. 注销客户端 - logger.info("步骤9: 注销客户端") - await device_manager.unregister_websocket_client(client_name) - logger.info(f"客户端 {client_name} 已注销") - - except Exception as e: - logger.error(f"WebSocket客户端操作失败: {e}") - - # 10. 测试完成 - logger.info("步骤10: 测试完成") - logger.info("设备事件缓冲和WebSocket推送功能测试完成") - - except Exception as e: - logger.error(f"测试过程中发生异常: {e}") - import traceback - logger.error(f"异常堆栈: {traceback.format_exc()}") - - finally: - # 清理资源 - logger.info("清理测试资源") - try: - await device_manager.stop_event_pusher() - logger.info("设备事件推送服务已停止") - except Exception as e: - logger.error(f"停止设备事件推送服务失败: {e}") - -async def test_real_websocket_connection(): - """测试真实的WebSocket连接和事件推送""" - logger.info("开始真实WebSocket连接测试") - - try: - # 1. 启动设备事件推送服务 - await device_manager.start_event_pusher() - - # 2. 模拟一些设备事件 - logger.info("模拟设备事件...") - await device_manager.handle_auto_discovered_device_event("real_device_001", "device", {"model": "Real Device 1"}) - await device_manager.handle_auto_discovered_device_event("real_device_002", "device", {"model": "Real Device 2"}) - - # 3. 创建并连接WebSocket客户端(使用本地测试服务器) - client_name = f"real_test_client_{int(time.time())}" - test_url = "ws://localhost:8080/ws" - - logger.info(f"创建WebSocket客户端: {client_name}") - client = await websocket_manager.create_client(client_name, test_url) - - logger.info(f"连接WebSocket客户端: {client_name}") - success = await websocket_manager.connect_client(client_name) - - if success: - logger.info(f"WebSocket客户端连接成功: {client_name}") - - # 等待一段时间,让事件推送完成 - logger.info("等待事件推送完成...") - await asyncio.sleep(2) - - # 断开连接 - logger.info(f"断开WebSocket客户端: {client_name}") - await websocket_manager.disconnect_client(client_name) - else: - logger.error(f"WebSocket客户端连接失败: {client_name}") - - except Exception as e: - logger.error(f"真实WebSocket连接测试失败: {e}") - - finally: - await device_manager.stop_event_pusher() - -if __name__ == "__main__": - logger.info("=" * 60) - logger.info("设备事件缓冲和WebSocket推送功能测试") - logger.info("=" * 60) - - # 运行测试 - asyncio.run(test_device_event_buffer()) - - # 可选:运行真实WebSocket连接测试 - # asyncio.run(test_real_websocket_connection()) - - logger.info("=" * 60) - logger.info("测试完成") - logger.info("=" * 60) diff --git a/run_websocket_test.py b/tests/run_websocket_test.py similarity index 100% rename from run_websocket_test.py rename to tests/run_websocket_test.py diff --git a/test_app_startup.py b/tests/test_app_startup.py similarity index 100% rename from test_app_startup.py rename to tests/test_app_startup.py diff --git a/test_auto_logging.py b/tests/test_auto_logging.py similarity index 100% rename from test_auto_logging.py rename to tests/test_auto_logging.py diff --git a/test_compatibility.py b/tests/test_compatibility.py similarity index 100% rename from test_compatibility.py rename to tests/test_compatibility.py diff --git a/test_complete_event_push.py b/tests/test_complete_event_push.py similarity index 100% rename from test_complete_event_push.py rename to tests/test_complete_event_push.py diff --git a/test_device_disconnect_fix.py b/tests/test_device_disconnect_fix.py similarity index 100% rename from test_device_disconnect_fix.py rename to tests/test_device_disconnect_fix.py diff --git a/tests/test_device_event_buffer.py b/tests/test_device_event_buffer.py index 8ac84b4..83762a9 100644 --- a/tests/test_device_event_buffer.py +++ b/tests/test_device_event_buffer.py @@ -1,132 +1,175 @@ #!/usr/bin/env python3 """ -测试设备事件缓冲和推送功能 -验证:先启动监听,后开启WebSocket,之前监听到的数据也能推送 +设备事件缓冲和WebSocket推送功能测试 +验证在用户客户端连接WebSocket时能够推送之前的监听数据 """ - import asyncio +import json import time -import sys -import os - -# 添加项目根目录到Python路径 -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - +from datetime import datetime from app.core.device.manager import device_manager from app.core.websocket.manager import websocket_manager from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) -async def simulate_device_events(): - """模拟设备事件""" - logger.info("开始模拟设备事件") - - # 模拟设备连接事件 - await device_manager.handle_auto_discovered_device_event( - device_id="test_device_001", - status="device", - device_info={"model": "Test Device 1", "manufacturer": "Test Corp"} - ) - - await asyncio.sleep(1) - - # 模拟设备断开事件 - await device_manager.handle_auto_discovered_device_event( - device_id="test_device_001", - status="offline", - device_info={"model": "Test Device 1", "manufacturer": "Test Corp"} - ) - - await asyncio.sleep(1) - - # 模拟另一个设备连接 - await device_manager.handle_auto_discovered_device_event( - device_id="test_device_002", - status="device", - device_info={"model": "Test Device 2", "manufacturer": "Test Corp"} - ) - - logger.info("设备事件模拟完成") - -async def test_websocket_connection(): - """测试WebSocket连接和事件推送""" - logger.info("开始测试WebSocket连接") +async def test_device_event_buffer(): + """测试设备事件缓冲和WebSocket推送功能""" + logger.info("开始设备事件缓冲和WebSocket推送功能测试") try: - # 创建WebSocket客户端 - client_name = "test_client_001" - client = await websocket_manager.create_client( - name=client_name, - url="ws://localhost:8080/ws", # 模拟WebSocket服务器 - heartbeat_interval=30 - ) + # 1. 启动设备事件推送服务 + logger.info("步骤1: 启动设备事件推送服务") + await device_manager.start_event_pusher() + logger.info("设备事件推送服务启动成功") - logger.info(f"WebSocket客户端创建成功: {client_name}") + # 2. 模拟一些设备事件(在WebSocket客户端连接之前) + logger.info("步骤2: 模拟设备事件(在WebSocket客户端连接之前)") + test_events = [ + ("device_001", "device", {"model": "Test Device 1", "manufacturer": "Test Corp"}), + ("device_002", "device", {"model": "Test Device 2", "manufacturer": "Test Corp"}), + ("device_003", "offline", {"model": "Test Device 3", "manufacturer": "Test Corp"}), + ("device_001", "offline", {"model": "Test Device 1", "manufacturer": "Test Corp"}), + ] - # 连接客户端(这里会触发缓冲事件推送) - success = await websocket_manager.connect_client(client_name) - if success: - logger.info(f"WebSocket客户端连接成功: {client_name}") - else: - logger.error(f"WebSocket客户端连接失败: {client_name}") - return + for device_id, status, device_info in test_events: + logger.info(f"模拟设备事件: {device_id} - {status}") + await device_manager.handle_auto_discovered_device_event(device_id, status, device_info) + await asyncio.sleep(0.5) # 短暂延迟,模拟真实场景 + + # 3. 检查缓冲的事件 + logger.info("步骤3: 检查缓冲的事件") + # 使用公共API获取缓冲事件,而不是直接访问内部属性 + buffered_events = await device_manager.get_buffered_events("test_check") + buffer_size = len(buffered_events) + logger.info(f"当前缓冲事件数量: {buffer_size}") - # 等待一段时间,观察事件推送 - await asyncio.sleep(5) + if buffer_size > 0: + event_types = {} + for event in buffered_events: + event_type = event.get('type', 'unknown') + event_types[event_type] = event_types.get(event_type, 0) + 1 + logger.info(f"缓冲事件类型分布: {event_types}") + + # 4. 创建WebSocket客户端 + logger.info("步骤4: 创建WebSocket客户端") + client_name = f"test_client_{int(time.time())}" + test_url = "ws://localhost:8080/ws" # 使用一个测试URL + + try: + # 创建客户端(这里只是测试,不实际连接) + client = await websocket_manager.create_client(client_name, test_url, heartbeat_interval=30) + logger.info(f"WebSocket客户端创建成功: {client_name}") + + # 5. 模拟客户端连接(注册设备事件推送) + logger.info("步骤5: 模拟客户端连接,注册设备事件推送") + await device_manager.register_websocket_client(client_name) + logger.info(f"客户端 {client_name} 已注册设备事件推送") + + # 6. 获取缓冲的事件 + logger.info("步骤6: 获取缓冲的事件") + buffered_events = await device_manager.get_buffered_events(client_name) + logger.info(f"获取到 {len(buffered_events)} 个缓冲事件") + + # 7. 验证事件内容 + logger.info("步骤7: 验证事件内容") + for i, event in enumerate(buffered_events): + logger.info(f"事件 {i+1}: {event.get('type')} - {event.get('device_id')} - {event.get('status')}") + logger.info(f" 时间戳: {event.get('timestamp')}") + logger.info(f" 设备信息: {event.get('device_info')}") + + # 8. 模拟推送事件到WebSocket客户端 + logger.info("步骤8: 模拟推送事件到WebSocket客户端") + success_count = 0 + for event in buffered_events: + try: + # 这里只是模拟,实际推送需要真实的WebSocket连接 + logger.info(f"模拟推送事件: {event.get('type')} - {event.get('device_id')}") + success_count += 1 + except Exception as e: + logger.error(f"推送事件失败: {e}") + + logger.info(f"模拟推送完成: {success_count}/{len(buffered_events)} 个事件") + + # 9. 注销客户端 + logger.info("步骤9: 注销客户端") + await device_manager.unregister_websocket_client(client_name) + logger.info(f"客户端 {client_name} 已注销") + + except Exception as e: + logger.error(f"WebSocket客户端操作失败: {e}") - # 断开客户端 - await websocket_manager.disconnect_client(client_name) - logger.info(f"WebSocket客户端断开: {client_name}") + # 10. 测试完成 + logger.info("步骤10: 测试完成") + logger.info("设备事件缓冲和WebSocket推送功能测试完成") except Exception as e: - logger.error(f"WebSocket连接测试失败: {e}") + logger.error(f"测试过程中发生异常: {e}") + import traceback + logger.error(f"异常堆栈: {traceback.format_exc()}") + + finally: + # 清理资源 + logger.info("清理测试资源") + try: + await device_manager.stop_event_pusher() + logger.info("设备事件推送服务已停止") + except Exception as e: + logger.error(f"停止设备事件推送服务失败: {e}") -async def main(): - """主测试函数""" - logger.info("开始设备事件缓冲和推送功能测试") +async def test_real_websocket_connection(): + """测试真实的WebSocket连接和事件推送""" + logger.info("开始真实WebSocket连接测试") try: # 1. 启动设备事件推送服务 - logger.info("步骤1: 启动设备事件推送服务") await device_manager.start_event_pusher() - await asyncio.sleep(1) - - # 2. 模拟设备事件(此时没有WebSocket客户端连接) - logger.info("步骤2: 模拟设备事件(无WebSocket客户端)") - await simulate_device_events() - await asyncio.sleep(2) - - # 3. 检查缓冲队列状态 - logger.info("步骤3: 检查缓冲队列状态") - buffered_events = await device_manager.get_buffered_events("test_check") - logger.info(f"缓冲队列中有 {len(buffered_events)} 个事件") - # 4. 创建WebSocket客户端并连接(应该会收到缓冲的事件) - logger.info("步骤4: 创建WebSocket客户端并连接") - await test_websocket_connection() + # 2. 模拟一些设备事件 + logger.info("模拟设备事件...") + await device_manager.handle_auto_discovered_device_event("real_device_001", "device", {"model": "Real Device 1"}) + await device_manager.handle_auto_discovered_device_event("real_device_002", "device", {"model": "Real Device 2"}) - # 5. 再次检查缓冲队列状态(应该为空) - logger.info("步骤5: 再次检查缓冲队列状态") - buffered_events_after = await device_manager.get_buffered_events("test_check_after") - logger.info(f"连接后缓冲队列中有 {len(buffered_events_after)} 个事件") + # 3. 创建并连接WebSocket客户端(使用本地测试服务器) + client_name = f"real_test_client_{int(time.time())}" + test_url = "ws://localhost:8080/ws" - # 6. 清理资源 - logger.info("步骤6: 清理资源") - await device_manager.cleanup() + logger.info(f"创建WebSocket客户端: {client_name}") + client = await websocket_manager.create_client(client_name, test_url) - logger.info("测试完成!") + logger.info(f"连接WebSocket客户端: {client_name}") + success = await websocket_manager.connect_client(client_name) - # 输出测试结果 - if len(buffered_events) > 0 and len(buffered_events_after) == 0: - logger.info("✅ 测试成功:缓冲事件被正确推送给WebSocket客户端") - else: - logger.warning("⚠️ 测试结果异常:缓冲事件推送可能有问题") + if success: + logger.info(f"WebSocket客户端连接成功: {client_name}") + # 等待一段时间,让事件推送完成 + logger.info("等待事件推送完成...") + await asyncio.sleep(2) + + # 断开连接 + logger.info(f"断开WebSocket客户端: {client_name}") + await websocket_manager.disconnect_client(client_name) + else: + logger.error(f"WebSocket客户端连接失败: {client_name}") + except Exception as e: - logger.error(f"测试过程中发生异常: {e}") - import traceback - logger.error(f"异常堆栈: {traceback.format_exc()}") + logger.error(f"真实WebSocket连接测试失败: {e}") + + finally: + await device_manager.stop_event_pusher() if __name__ == "__main__": - asyncio.run(main()) + logger.info("=" * 60) + logger.info("设备事件缓冲和WebSocket推送功能测试") + logger.info("=" * 60) + + # 运行测试 + asyncio.run(test_device_event_buffer()) + + # 可选:运行真实WebSocket连接测试 + # asyncio.run(test_real_websocket_connection()) + + logger.info("=" * 60) + logger.info("测试完成") + logger.info("=" * 60) diff --git a/test_device_info_fix.py b/tests/test_device_info_fix.py similarity index 100% rename from test_device_info_fix.py rename to tests/test_device_info_fix.py diff --git a/test_device_properties_fix.py b/tests/test_device_properties_fix.py similarity index 100% rename from test_device_properties_fix.py rename to tests/test_device_properties_fix.py diff --git a/test_heartbeat_queue_fix.py b/tests/test_heartbeat_queue_fix.py similarity index 100% rename from test_heartbeat_queue_fix.py rename to tests/test_heartbeat_queue_fix.py diff --git a/test_message_flow.py b/tests/test_message_flow.py similarity index 100% rename from test_message_flow.py rename to tests/test_message_flow.py diff --git a/test_message_format_assembly.py b/tests/test_message_format_assembly.py similarity index 100% rename from test_message_format_assembly.py rename to tests/test_message_format_assembly.py diff --git a/test_message_format_simple.py b/tests/test_message_format_simple.py similarity index 100% rename from test_message_format_simple.py rename to tests/test_message_format_simple.py diff --git a/test_pretty_json_log.py b/tests/test_pretty_json_log.py similarity index 100% rename from test_pretty_json_log.py rename to tests/test_pretty_json_log.py diff --git a/test_refactored_device_manager.py b/tests/test_refactored_device_manager.py similarity index 100% rename from test_refactored_device_manager.py rename to tests/test_refactored_device_manager.py diff --git a/test_server.py b/tests/test_server.py similarity index 100% rename from test_server.py rename to tests/test_server.py diff --git a/test_simple_event_buffer.py b/tests/test_simple_event_buffer.py similarity index 100% rename from test_simple_event_buffer.py rename to tests/test_simple_event_buffer.py diff --git a/test_system_api.py b/tests/test_system_api.py similarity index 100% rename from test_system_api.py rename to tests/test_system_api.py diff --git a/test_system_api_endpoints.py b/tests/test_system_api_endpoints.py similarity index 100% rename from test_system_api_endpoints.py rename to tests/test_system_api_endpoints.py diff --git a/test_terminal_event_type.py b/tests/test_terminal_event_type.py similarity index 100% rename from test_terminal_event_type.py rename to tests/test_terminal_event_type.py diff --git a/test_websocket_connection_speed.py b/tests/test_websocket_connection_speed.py similarity index 100% rename from test_websocket_connection_speed.py rename to tests/test_websocket_connection_speed.py