You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
9.1 KiB

#!/usr/bin/env python3
"""
测试WebSocket API的create_and_connect_client函数
验证是否正确集成了core.websocket模块
"""
import asyncio
import sys
import os
import json
from datetime import datetime
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.core.websocket.manager import websocket_manager
from app.schemas.websocket import CreateWebSocketClientRequest
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.DEBUG)
async def test_websocket_manager_integration():
"""测试WebSocket管理器集成"""
print("=== 测试WebSocket管理器集成 ===")
try:
# 测试创建客户端
client_name = "test_client_1"
client_url = "wss://localhost:8080"
print(f"创建客户端: {client_name} -> {client_url}")
client = await websocket_manager.create_client(client_name, client_url)
if client:
print(f"✅ 客户端创建成功: {client.name}")
print(f" 状态: {client.state.value}")
print(f" 连接状态: {client.is_connected}")
else:
print("❌ 客户端创建失败")
return False
# 测试连接客户端
print(f"连接客户端: {client_name}")
success = await websocket_manager.connect_client(client_name)
if success:
print(f"✅ 客户端连接成功")
else:
print(f"❌ 客户端连接失败")
return False
# 测试创建Channel
channel_name = "test_channel"
print(f"创建Channel: {channel_name}")
channel = await websocket_manager.create_channel(channel_name, 1000)
if channel:
print(f"✅ Channel创建成功: {channel.name}")
print(f" 状态: {channel.state.value}")
else:
print("❌ Channel创建失败")
return False
# 测试连接Channel
print(f"连接Channel: {channel_name}")
await channel.connect()
print(f"✅ Channel连接成功")
# 测试创建适配器
print(f"创建适配器: {client_name} -> {channel_name}")
adapter = await websocket_manager.create_adapter(client_name, channel_name)
if adapter:
print(f"✅ 适配器创建成功")
print(f" 客户端: {adapter.client_name}")
print(f" Channel: {adapter.channel_name}")
else:
print("❌ 适配器创建失败")
return False
# 测试获取统计信息
stats = websocket_manager.get_stats()
print(f"✅ 获取统计信息成功")
print(f" 客户端数量: {stats['client_count']}")
print(f" Channel数量: {stats['channel_count']}")
print(f" 适配器数量: {stats['adapter_count']}")
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_create_and_connect_client_logic():
"""测试create_and_connect_client的逻辑"""
print("\n=== 测试create_and_connect_client逻辑 ===")
try:
# 模拟API请求
request_data = {
"name": "test_client_2",
"url": "wss://localhost:8080"
}
print(f"模拟API请求: {json.dumps(request_data, ensure_ascii=False)}")
# 创建请求对象
request = CreateWebSocketClientRequest(**request_data)
print(f"✅ 请求对象创建成功: {request.name} -> {request.url}")
# 模拟API逻辑
client_name = request.name
client_url = request.url
# 1. 创建客户端
print(f"1. 创建客户端: {client_name}")
client = await websocket_manager.create_client(client_name, client_url)
if not client:
print("❌ 客户端创建失败")
return False
print(f"✅ 客户端创建成功")
# 2. 连接客户端
print(f"2. 连接客户端: {client_name}")
success = await websocket_manager.connect_client(client_name)
if not success:
print("❌ 客户端连接失败")
return False
print(f"✅ 客户端连接成功")
# 3. 确保默认Channels存在并连接,创建适配器
print(f"3. 创建默认Channels和适配器")
default_channels = ["default", "system", "events"]
max_channel_size = 1000
for ch in default_channels:
print(f" 处理Channel: {ch}")
# 获取或创建Channel
channel = websocket_manager.get_channel(ch)
if not channel:
print(f" 创建Channel: {ch}")
channel = await websocket_manager.create_channel(ch, max_channel_size)
if channel:
print(f" 连接Channel: {ch}")
await channel.connect()
print(f" 创建适配器: {client_name} -> {ch}")
adapter = await websocket_manager.create_adapter(client_name, ch)
if adapter:
print(f" ✅ 适配器创建成功")
else:
print(f" ❌ 适配器创建失败")
else:
print(f" ❌ Channel创建失败")
print(f"✅ 默认Channels和适配器创建完成")
# 4. 验证结果
print(f"4. 验证结果")
client = websocket_manager.get_client(client_name)
if client and client.is_connected:
print(f"✅ 客户端状态正确: {client.state.value}")
else:
print(f"❌ 客户端状态错误")
return False
# 检查Channels
for ch in default_channels:
channel = websocket_manager.get_channel(ch)
if channel and channel.is_connected:
print(f"✅ Channel {ch} 状态正确: {channel.state.value}")
else:
print(f"❌ Channel {ch} 状态错误")
# 检查适配器
for ch in default_channels:
adapter = websocket_manager.get_adapter(client_name, ch)
if adapter:
print(f"✅ 适配器 {client_name} -> {ch} 存在")
else:
print(f"❌ 适配器 {client_name} -> {ch} 不存在")
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_error_handling():
"""测试错误处理"""
print("\n=== 测试错误处理 ===")
try:
# 测试重复创建客户端
client_name = "test_client_3"
client_url = "wss://localhost:8080"
print(f"测试重复创建客户端: {client_name}")
# 第一次创建
client1 = await websocket_manager.create_client(client_name, client_url)
print(f"✅ 第一次创建成功")
# 第二次创建(应该返回已存在的客户端)
client2 = await websocket_manager.create_client(client_name, client_url)
print(f"✅ 第二次创建成功(返回已存在的客户端)")
if client1 is client2:
print(f"✅ 返回的是同一个客户端对象")
else:
print(f"❌ 返回的不是同一个客户端对象")
return False
# 测试断开客户端
print(f"测试断开客户端: {client_name}")
success = await websocket_manager.disconnect_client(client_name)
if success:
print(f"✅ 客户端断开成功")
else:
print(f"❌ 客户端断开失败")
return False
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def main():
"""主测试函数"""
print("开始测试WebSocket API集成...")
tests = [
test_websocket_manager_integration,
test_create_and_connect_client_logic,
test_error_handling,
]
results = []
for test in tests:
try:
result = await test()
results.append(result)
except Exception as e:
print(f"❌ 测试异常: {e}")
results.append(False)
# 输出测试结果
print("\n" + "="*50)
print("测试结果汇总:")
for i, result in enumerate(results):
status = "✅ 通过" if result else "❌ 失败"
print(f" 测试 {i+1}: {status}")
all_passed = all(results)
print(f"\n总体结果: {'✅ 所有测试通过' if all_passed else '❌ 部分测试失败'}")
# 清理资源
print("\n清理测试资源...")
await websocket_manager.cleanup()
print("✅ 资源清理完成")
return all_passed
if __name__ == "__main__":
try:
result = asyncio.run(main())
sys.exit(0 if result else 1)
except KeyboardInterrupt:
print("\n测试被用户中断")
sys.exit(1)
except Exception as e:
print(f"测试异常: {e}")
sys.exit(1)