You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

146 lines
4.8 KiB

#!/usr/bin/env python3
"""
WebSocket连接测试脚本
用于测试WebSocket客户端的连接功能
"""
import asyncio
import json
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.core.websocket.manager import websocket_manager
from app.schemas.websocket import CreateWebSocketClientRequest
from app.utils.log import get_logger
logger = get_logger(__name__)
async def test_websocket_connection():
"""测试WebSocket连接功能"""
print("=== WebSocket连接测试 ===")
# 测试配置
test_cases = [
{
"name": "test_ws_echo",
"url": "ws://echo.websocket.org",
"description": "WebSocket Echo服务器测试"
},
{
"name": "test_ws_local",
"url": "ws://localhost:8080/ws",
"description": "本地WebSocket服务器测试"
},
{
"name": "test_wss_echo",
"url": "wss://echo.websocket.org",
"description": "WSS Echo服务器测试"
}
]
for test_case in test_cases:
print(f"\n--- 测试: {test_case['description']} ---")
print(f"URL: {test_case['url']}")
try:
# 创建请求对象
request = CreateWebSocketClientRequest(
name=test_case["name"],
url=test_case["url"],
heartbeat_interval=120 # 2分钟心跳间隔
)
# 创建客户端
print("1. 创建WebSocket客户端...")
client = await websocket_manager.create_client(request.name, request.url)
print(f" 客户端创建成功: {client.name}")
# 连接客户端
print("2. 连接WebSocket客户端...")
success = await websocket_manager.connect_client(request.name)
if success:
print(f" ✅ 连接成功: {request.name}")
# 获取客户端状态
client = websocket_manager.get_client(request.name)
if client:
stats = client.get_stats()
print(f" 状态: {stats['state']}")
print(f" 已连接: {stats['is_connected']}")
# 等待一段时间观察连接状态
print("3. 等待5秒观察连接状态...")
await asyncio.sleep(5)
# 再次检查状态
client = websocket_manager.get_client(request.name)
if client:
stats = client.get_stats()
print(f" 5秒后状态: {stats['state']}")
print(f" 5秒后已连接: {stats['is_connected']}")
# 断开连接
print("4. 断开WebSocket客户端...")
await websocket_manager.disconnect_client(request.name)
print(f" ✅ 断开成功: {request.name}")
else:
print(f" ❌ 连接失败: {request.name}")
except Exception as e:
print(f" ❌ 测试异常: {e}")
logger.error(f"WebSocket连接测试异常: {e}")
# 清理
try:
await websocket_manager.remove_client(test_case["name"])
except:
pass
print("\n=== 测试完成 ===")
async def test_websocket_manager_stats():
"""测试WebSocket管理器统计信息"""
print("\n=== WebSocket管理器统计信息 ===")
try:
stats = websocket_manager.get_stats()
print(f"客户端数量: {stats['client_count']}")
print(f"Channel数量: {stats['channel_count']}")
print(f"适配器数量: {stats['adapter_count']}")
print(f"创建时间: {stats['created_at']}")
if stats['clients']:
print("\n客户端详情:")
for name, client_stats in stats['clients'].items():
print(f" - {name}: {client_stats['state']} (连接: {client_stats['is_connected']})")
except Exception as e:
print(f"获取统计信息失败: {e}")
async def main():
"""主函数"""
try:
# 测试WebSocket连接
await test_websocket_connection()
# 测试管理器统计
await test_websocket_manager_stats()
except KeyboardInterrupt:
print("\n测试被用户中断")
except Exception as e:
print(f"测试过程中发生异常: {e}")
logger.error(f"测试异常: {e}")
finally:
# 清理资源
print("\n清理WebSocket管理器资源...")
await websocket_manager.cleanup()
print("清理完成")
if __name__ == "__main__":
# 运行测试
asyncio.run(main())