""" 测试WebSocket的stop/start逻辑 """ import asyncio import json from app.core.websocket.manager import websocket_manager from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.INFO) async def test_stop_start_logic(): """测试stop/start逻辑""" logger.info("开始测试WebSocket stop/start逻辑") try: # 1. 创建客户端 logger.info("1. 创建客户端") client = await websocket_manager.create_client("test_stop_start", "ws://localhost:8080") logger.info(f"客户端创建成功: {client.name}") # 2. 检查初始状态 logger.info("2. 检查初始状态") stats = websocket_manager.get_stats() logger.info(f"客户端数量: {stats['client_count']}") logger.info(f"Channel数量: {stats['channel_count']}") logger.info(f"适配器数量: {stats['adapter_count']}") # 3. 连接客户端 logger.info("3. 连接客户端") success = await websocket_manager.connect_client("test_stop_start") logger.info(f"客户端连接成功: {success}") # 4. 发送测试消息 logger.info("4. 发送测试消息") await websocket_manager.send_message("test_stop_start", "test", {"message": "hello"}) await websocket_manager.send_heartbeat("test_stop_start") # 5. 检查连接后状态 logger.info("5. 检查连接后状态") stats = websocket_manager.get_stats() logger.info(f"客户端数量: {stats['client_count']}") logger.info(f"Channel数量: {stats['channel_count']}") logger.info(f"适配器数量: {stats['adapter_count']}") # 6. 断开客户端 logger.info("6. 断开客户端") success = await websocket_manager.disconnect_client("test_stop_start") logger.info(f"客户端断开成功: {success}") # 7. 检查断开后状态 logger.info("7. 检查断开后状态") stats = websocket_manager.get_stats() logger.info(f"客户端数量: {stats['client_count']}") logger.info(f"Channel数量: {stats['channel_count']}") logger.info(f"适配器数量: {stats['adapter_count']}") # 8. 重新连接客户端 logger.info("8. 重新连接客户端") success = await websocket_manager.connect_client("test_stop_start") logger.info(f"客户端重新连接成功: {success}") # 9. 再次发送消息 logger.info("9. 再次发送消息") await websocket_manager.send_message("test_stop_start", "test", {"message": "hello again"}) await websocket_manager.send_heartbeat("test_stop_start") # 10. 移除客户端 logger.info("10. 移除客户端") success = await websocket_manager.remove_client("test_stop_start") logger.info(f"客户端移除成功: {success}") # 11. 检查最终状态 logger.info("11. 检查最终状态") stats = websocket_manager.get_stats() logger.info(f"客户端数量: {stats['client_count']}") logger.info(f"Channel数量: {stats['channel_count']}") logger.info(f"适配器数量: {stats['adapter_count']}") logger.info("✅ WebSocket stop/start逻辑测试完成") except Exception as e: logger.error(f"❌ WebSocket stop/start逻辑测试失败: {e}") raise async def test_cleanup_logic(): """测试cleanup逻辑""" logger.info("开始测试WebSocket cleanup逻辑") try: # 1. 创建多个客户端 logger.info("1. 创建多个客户端") clients = [] for i in range(3): client = await websocket_manager.create_client(f"test_cleanup_{i}", "ws://localhost:8080") clients.append(client) await websocket_manager.connect_client(f"test_cleanup_{i}") # 2. 检查创建后状态 logger.info("2. 检查创建后状态") stats = websocket_manager.get_stats() logger.info(f"客户端数量: {stats['client_count']}") logger.info(f"Channel数量: {stats['channel_count']}") logger.info(f"适配器数量: {stats['adapter_count']}") # 3. 执行cleanup logger.info("3. 执行cleanup") await websocket_manager.cleanup() # 4. 检查cleanup后状态 logger.info("4. 检查cleanup后状态") stats = websocket_manager.get_stats() logger.info(f"客户端数量: {stats['client_count']}") logger.info(f"Channel数量: {stats['channel_count']}") logger.info(f"适配器数量: {stats['adapter_count']}") logger.info("✅ WebSocket cleanup逻辑测试完成") except Exception as e: logger.error(f"❌ WebSocket cleanup逻辑测试失败: {e}") raise async def main(): """主测试函数""" logger.info("开始WebSocket stop/start测试") try: # 测试stop/start逻辑 await test_stop_start_logic() # 测试cleanup逻辑 await test_cleanup_logic() logger.info("🎉 所有测试完成") except Exception as e: logger.error(f"❌ 测试失败: {e}") raise if __name__ == "__main__": asyncio.run(main())