#!/usr/bin/env python3 """ WebSocket连接测试脚本 用于测试WebSocket客户端的连接功能 """ import asyncio import json import sys import os # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.websocket.manager import websocket_manager from app.schemas.websocket import CreateWebSocketClientRequest from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) async def test_websocket_connection(): """测试WebSocket连接功能""" print("=== WebSocket连接测试 ===") # 测试配置 - 使用更可靠的测试服务器 test_cases = [ { "name": "test_ws_demo", "url": "ws://demos.kaazing.com/echo", "description": "Kaazing Echo服务器测试" }, { "name": "test_ws_local", "url": "ws://localhost:8080/ws", "description": "本地WebSocket服务器测试" }, { "name": "test_ws_invalid", "url": "http://example.com", "description": "无效URL测试(应该失败)" } ] for test_case in test_cases: print(f"\n--- 测试: {test_case['description']} ---") print(f"URL: {test_case['url']}") try: # 创建请求对象 request = CreateWebSocketClientRequest( name=test_case["name"], url=test_case["url"], heartbeat_interval=120 # 2分钟心跳间隔 ) # 创建客户端 print("1. 创建WebSocket客户端...") client = await websocket_manager.create_client(request.name, request.url) print(f" 客户端创建成功: {client.name}") # 连接客户端 print("2. 连接WebSocket客户端...") success = await websocket_manager.connect_client(request.name) if success: print(f" ✅ 连接成功: {request.name}") # 获取客户端状态 client = websocket_manager.get_client(request.name) if client: stats = client.get_stats() print(f" 状态: {stats['state']}") print(f" 已连接: {stats['is_connected']}") # 等待一段时间观察连接状态 print("3. 等待3秒观察连接状态...") await asyncio.sleep(3) # 再次检查状态 client = websocket_manager.get_client(request.name) if client: stats = client.get_stats() print(f" 3秒后状态: {stats['state']}") print(f" 3秒后已连接: {stats['is_connected']}") # 断开连接 print("4. 断开WebSocket客户端...") await websocket_manager.disconnect_client(request.name) print(f" ✅ 断开成功: {request.name}") else: print(f" ❌ 连接失败: {request.name}") except Exception as e: print(f" ❌ 测试异常: {e}") logger.error(f"WebSocket连接测试异常: {e}") # 清理 try: await websocket_manager.remove_client(test_case["name"]) except: pass print("\n=== 测试完成 ===") async def test_websocket_manager_stats(): """测试WebSocket管理器统计信息""" print("\n=== WebSocket管理器统计信息 ===") try: stats = websocket_manager.get_stats() print(f"客户端数量: {stats['client_count']}") print(f"Channel数量: {stats['channel_count']}") print(f"适配器数量: {stats['adapter_count']}") print(f"创建时间: {stats['created_at']}") if stats['clients']: print("\n客户端详情:") for name, client_stats in stats['clients'].items(): print(f" - {name}: {client_stats['state']} (连接: {client_stats['is_connected']})") except Exception as e: print(f"获取统计信息失败: {e}") async def test_api_endpoint(): """测试API端点""" print("\n=== API端点测试 ===") # 模拟API请求 test_request = { "name": "test_api_client", "url": "ws://demos.kaazing.com/echo", "heartbeat_interval": 120 } print(f"测试请求: {json.dumps(test_request, indent=2)}") try: # 创建请求对象 request = CreateWebSocketClientRequest(**test_request) # 创建客户端 print("1. 创建WebSocket客户端...") client = await websocket_manager.create_client(request.name, request.url) print(f" 客户端创建成功: {client.name}") # 连接客户端 print("2. 连接WebSocket客户端...") success = await websocket_manager.connect_client(request.name) if success: print(f" ✅ 连接成功: {request.name}") # 确保默认Channels存在并连接,创建适配器 default_channels = ["default", "system", "events"] max_channel_size = 1000 print("3. 创建默认Channels和适配器...") for ch in default_channels: channel = websocket_manager.get_channel(ch) if not channel: channel = await websocket_manager.create_channel(ch, max_channel_size) await channel.connect() await websocket_manager.create_adapter(request.name, ch, request.heartbeat_interval) print(f" ✅ Channel {ch} 和适配器创建成功") print("4. 等待2秒观察连接状态...") await asyncio.sleep(2) # 检查状态 client = websocket_manager.get_client(request.name) if client: stats = client.get_stats() print(f" 最终状态: {stats['state']}") print(f" 最终连接: {stats['is_connected']}") # 断开连接 print("5. 断开WebSocket客户端...") await websocket_manager.disconnect_client(request.name) print(f" ✅ 断开成功: {request.name}") else: print(f" ❌ 连接失败: {request.name}") except Exception as e: print(f" ❌ API测试异常: {e}") logger.error(f"API测试异常: {e}") # 清理 try: await websocket_manager.remove_client("test_api_client") except: pass async def main(): """主函数""" try: # 测试WebSocket连接 await test_websocket_connection() # 测试API端点 await test_api_endpoint() # 测试管理器统计 await test_websocket_manager_stats() except KeyboardInterrupt: print("\n测试被用户中断") except Exception as e: print(f"测试过程中发生异常: {e}") logger.error(f"测试异常: {e}") finally: # 清理资源 print("\n清理WebSocket管理器资源...") await websocket_manager.cleanup() print("清理完成") if __name__ == "__main__": # 运行测试 asyncio.run(main())