#!/usr/bin/env python3 """ 测试WebSocket心跳间隔参数化功能 验证API接口create_and_connect_client能够正确传递心跳间隔参数 """ import asyncio import sys import os import json from datetime import datetime # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.websocket.manager import websocket_manager from app.schemas.websocket import CreateWebSocketClientRequest from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) async def test_heartbeat_interval_parameterization(): """测试心跳间隔参数化功能""" print("=== 测试WebSocket心跳间隔参数化功能 ===") try: # 测试1: 使用默认心跳间隔 print("\n1. 测试默认心跳间隔") request_data_1 = { "name": "test_client_default", "url": "wss://localhost:8080" } print(f"请求数据: {json.dumps(request_data_1, ensure_ascii=False)}") request_1 = CreateWebSocketClientRequest(**request_data_1) print(f"✅ 请求对象创建成功: heartbeat_interval = {request_1.heartbeat_interval}") # 模拟创建客户端和适配器 client_1 = await websocket_manager.create_client(request_1.name, request_1.url) if client_1: print(f"✅ 客户端创建成功: {client_1.name}") # 创建Channel和适配器 channel_1 = await websocket_manager.create_channel("test_channel_1", 1000) await channel_1.connect() adapter_1 = await websocket_manager.create_adapter(request_1.name, "test_channel_1", request_1.heartbeat_interval) if adapter_1: print(f"✅ 适配器创建成功,心跳间隔: {adapter_1.heartbeat_interval}秒") else: print("❌ 适配器创建失败") return False else: print("❌ 客户端创建失败") return False # 测试2: 使用自定义心跳间隔 print("\n2. 测试自定义心跳间隔") request_data_2 = { "name": "test_client_custom", "url": "wss://localhost:8080", "heartbeat_interval": 60 } print(f"请求数据: {json.dumps(request_data_2, ensure_ascii=False)}") request_2 = CreateWebSocketClientRequest(**request_data_2) print(f"✅ 请求对象创建成功: heartbeat_interval = {request_2.heartbeat_interval}") # 模拟创建客户端和适配器 client_2 = await websocket_manager.create_client(request_2.name, request_2.url) if client_2: print(f"✅ 客户端创建成功: {client_2.name}") # 创建Channel和适配器 channel_2 = await websocket_manager.create_channel("test_channel_2", 1000) await channel_2.connect() adapter_2 = await websocket_manager.create_adapter(request_2.name, "test_channel_2", request_2.heartbeat_interval) if adapter_2: print(f"✅ 适配器创建成功,心跳间隔: {adapter_2.heartbeat_interval}秒") else: print("❌ 适配器创建失败") return False else: print("❌ 客户端创建失败") return False # 测试3: 验证心跳间隔范围限制 print("\n3. 测试心跳间隔范围限制") # 测试最小值 try: request_data_min = { "name": "test_client_min", "url": "wss://localhost:8080", "heartbeat_interval": 4 # 小于最小值5 } request_min = CreateWebSocketClientRequest(**request_data_min) print("❌ 最小值限制应该被拒绝") return False except ValueError as e: print(f"✅ 最小值限制正确: {e}") # 测试最大值 try: request_data_max = { "name": "test_client_max", "url": "wss://localhost:8080", "heartbeat_interval": 301 # 大于最大值300 } request_max = CreateWebSocketClientRequest(**request_data_max) print("❌ 最大值限制应该被拒绝") return False except ValueError as e: print(f"✅ 最大值限制正确: {e}") # 测试边界值 try: request_data_boundary = { "name": "test_client_boundary", "url": "wss://localhost:8080", "heartbeat_interval": 5 # 最小值 } request_boundary = CreateWebSocketClientRequest(**request_data_boundary) print(f"✅ 边界值测试通过: heartbeat_interval = {request_boundary.heartbeat_interval}") except ValueError as e: print(f"❌ 边界值测试失败: {e}") return False # 测试4: 模拟API接口响应 print("\n4. 测试API接口响应") # 模拟API响应数据 api_response_data = { "name": "test_client_api", "url": "wss://localhost:8080", "status": "connected", "heartbeat_interval": 45 } print(f"API响应数据: {json.dumps(api_response_data, ensure_ascii=False)}") print(f"✅ API响应包含心跳间隔信息: {api_response_data['heartbeat_interval']}秒") # 验证响应数据格式 if "heartbeat_interval" in api_response_data: print("✅ 响应数据格式正确") else: print("❌ 响应数据格式错误") return False print("\n=== 所有测试通过 ===") return True except Exception as e: print(f"❌ 测试异常: {e}") return False async def test_adapter_heartbeat_loop(): """测试适配器心跳循环功能""" print("\n=== 测试适配器心跳循环功能 ===") try: # 创建测试客户端和Channel client_name = "test_heartbeat_client" channel_name = "test_heartbeat_channel" client = await websocket_manager.create_client(client_name, "wss://localhost:8080") channel = await websocket_manager.create_channel(channel_name, 1000) await channel.connect() # 测试不同心跳间隔的适配器 test_intervals = [15, 30, 60] for interval in test_intervals: print(f"\n测试心跳间隔: {interval}秒") adapter = await websocket_manager.create_adapter(client_name, channel_name, interval) if adapter: print(f"✅ 适配器创建成功") print(f" 心跳间隔: {adapter.heartbeat_interval}秒") print(f" 心跳间隔设置正确: {adapter.heartbeat_interval == interval}") # 验证心跳间隔属性 if adapter.heartbeat_interval == interval: print(f"✅ 心跳间隔设置正确") else: print(f"❌ 心跳间隔设置错误: 期望{interval},实际{adapter.heartbeat_interval}") return False else: print(f"❌ 适配器创建失败") return False print("\n=== 心跳循环测试通过 ===") return True except Exception as e: print(f"❌ 心跳循环测试异常: {e}") return False async def main(): """主测试函数""" print("开始测试WebSocket心跳间隔参数化功能") print("=" * 50) try: # 测试心跳间隔参数化 result_1 = await test_heartbeat_interval_parameterization() if not result_1: print("❌ 心跳间隔参数化测试失败") return False # 测试适配器心跳循环 result_2 = await test_adapter_heartbeat_loop() if not result_2: print("❌ 适配器心跳循环测试失败") return False print("\n" + "=" * 50) print("🎉 所有测试通过!WebSocket心跳间隔参数化功能正常工作") return True except Exception as e: print(f"❌ 测试过程中发生异常: {e}") return False finally: # 清理测试资源 print("\n清理测试资源...") await websocket_manager.disconnect_all_clients() if __name__ == "__main__": asyncio.run(main())