#!/usr/bin/env python3 """ 简单的设备事件缓冲功能测试 验证设备事件缓冲功能是否正常工作 """ import asyncio import time from app.core.device.manager import device_manager from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.INFO) async def test_simple_event_buffer(): """简单的设备事件缓冲测试""" logger.info("开始简单的设备事件缓冲测试") try: # 1. 启动设备事件推送服务 logger.info("步骤1: 启动设备事件推送服务") await device_manager.start_event_pusher() logger.info("设备事件推送服务启动成功") # 2. 模拟设备事件 logger.info("步骤2: 模拟设备事件") test_events = [ ("device_001", "device", {"model": "Test Device 1"}), ("device_002", "device", {"model": "Test Device 2"}), ("device_003", "offline", {"model": "Test Device 3"}), ] for device_id, status, device_info in test_events: logger.info(f"模拟设备事件: {device_id} - {status}") await device_manager.handle_auto_discovered_device_event(device_id, status, device_info) await asyncio.sleep(0.1) # 3. 检查缓冲的事件 logger.info("步骤3: 检查缓冲的事件") buffer_size = len(device_manager._event_buffer) logger.info(f"当前缓冲事件数量: {buffer_size}") if buffer_size > 0: event_types = {} for event in device_manager._event_buffer: event_type = event.get('type', 'unknown') event_types[event_type] = event_types.get(event_type, 0) + 1 logger.info(f"缓冲事件类型分布: {event_types}") # 显示事件详情 for i, event in enumerate(device_manager._event_buffer): logger.info(f"缓冲事件 {i+1}: {event.get('type')} - {event.get('device_id')} - {event.get('status')}") # 4. 模拟WebSocket客户端注册 logger.info("步骤4: 模拟WebSocket客户端注册") client_name = f"test_client_{int(time.time())}" await device_manager.register_websocket_client(client_name) logger.info(f"客户端 {client_name} 已注册") # 5. 获取缓冲的事件 logger.info("步骤5: 获取缓冲的事件") buffered_events = await device_manager.get_buffered_events(client_name) logger.info(f"获取到 {len(buffered_events)} 个缓冲事件") # 6. 验证事件内容 logger.info("步骤6: 验证事件内容") for i, event in enumerate(buffered_events): logger.info(f"缓冲事件 {i+1}: {event.get('type')} - {event.get('device_id')} - {event.get('status')}") logger.info(f" 时间戳: {event.get('timestamp')}") logger.info(f" 设备信息: {event.get('device_info')}") # 7. 注销客户端 logger.info("步骤7: 注销客户端") await device_manager.unregister_websocket_client(client_name) logger.info(f"客户端 {client_name} 已注销") # 8. 测试完成 logger.info("步骤8: 测试完成") logger.info("简单的设备事件缓冲测试完成") # 9. 输出测试结果 if len(buffered_events) >= 3: # 期望至少3个事件 logger.info("✅ 测试成功:设备事件被正确缓冲") logger.info(f" 缓冲了 {len(buffered_events)} 个事件") return True else: logger.warning("⚠️ 测试失败:设备事件缓冲异常") logger.warning(f" 只缓冲了 {len(buffered_events)} 个事件,期望至少3个") return False except Exception as e: logger.error(f"测试过程中发生异常: {e}") import traceback logger.error(f"异常堆栈: {traceback.format_exc()}") return False finally: # 清理资源 logger.info("清理测试资源") try: await device_manager.stop_event_pusher() logger.info("设备事件推送服务已停止") except Exception as e: logger.error(f"停止设备事件推送服务失败: {e}") if __name__ == "__main__": logger.info("=" * 50) logger.info("简单的设备事件缓冲功能测试") logger.info("=" * 50) # 运行测试 success = asyncio.run(test_simple_event_buffer()) if success: logger.info("🎉 测试成功!") logger.info("✅ 设备事件缓冲功能正常工作") logger.info("✅ 用户客户端连接WebSocket时能够推送之前的监听数据") else: logger.error("❌ 测试失败!") logger.error("❌ 设备事件缓冲功能异常") logger.info("=" * 50) logger.info("测试完成") logger.info("=" * 50)