You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

371 lines
13 KiB

#!/usr/bin/env python3
"""
测试WebSocket API逻辑(不实际连接WebSocket服务器)
验证create_and_connect_client函数的逻辑是否正确
"""
import asyncio
import sys
import os
import json
from datetime import datetime
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.core.websocket.manager import websocket_manager
from app.schemas.websocket import CreateWebSocketClientRequest
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.DEBUG)
async def test_request_validation():
"""测试请求验证"""
print("=== 测试请求验证 ===")
try:
# 测试有效请求
valid_request_data = {
"name": "test_client",
"url": "wss://localhost:8080"
}
print(f"测试有效请求: {json.dumps(valid_request_data, ensure_ascii=False)}")
request = CreateWebSocketClientRequest(**valid_request_data)
print(f"✅ 有效请求验证通过: {request.name} -> {request.url}")
# 测试无效URL
invalid_request_data = {
"name": "test_client",
"url": "http://localhost:8080" # 不是WebSocket URL
}
print(f"测试无效URL: {json.dumps(invalid_request_data, ensure_ascii=False)}")
try:
request = CreateWebSocketClientRequest(**invalid_request_data)
print("❌ 无效URL应该被拒绝")
return False
except ValueError as e:
print(f"✅ 无效URL正确被拒绝: {e}")
# 测试空名称
empty_name_data = {
"name": "",
"url": "wss://localhost:8080"
}
print(f"测试空名称: {json.dumps(empty_name_data, ensure_ascii=False)}")
try:
request = CreateWebSocketClientRequest(**empty_name_data)
print("❌ 空名称应该被拒绝")
return False
except Exception as e:
print(f"✅ 空名称正确被拒绝: {e}")
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_client_creation_logic():
"""测试客户端创建逻辑"""
print("\n=== 测试客户端创建逻辑 ===")
try:
# 测试创建客户端
client_name = "test_client_1"
client_url = "wss://localhost:8080"
print(f"创建客户端: {client_name} -> {client_url}")
client = await websocket_manager.create_client(client_name, client_url)
if client:
print(f"✅ 客户端创建成功: {client.name}")
print(f" 状态: {client.state.value}")
print(f" 连接状态: {client.is_connected}")
else:
print("❌ 客户端创建失败")
return False
# 测试重复创建(应该返回已存在的客户端)
print(f"重复创建客户端: {client_name}")
client2 = await websocket_manager.create_client(client_name, client_url)
if client2 is client:
print(f"✅ 重复创建返回已存在的客户端")
else:
print(f"❌ 重复创建返回了不同的客户端")
return False
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_channel_creation_logic():
"""测试Channel创建逻辑"""
print("\n=== 测试Channel创建逻辑 ===")
try:
# 测试创建Channel
channel_name = "test_channel"
max_size = 1000
print(f"创建Channel: {channel_name}")
channel = await websocket_manager.create_channel(channel_name, max_size)
if channel:
print(f"✅ Channel创建成功: {channel.name}")
print(f" 状态: {channel.state.value}")
print(f" 最大大小: {channel.max_size}")
else:
print("❌ Channel创建失败")
return False
# 测试重复创建(应该返回已存在的Channel)
print(f"重复创建Channel: {channel_name}")
channel2 = await websocket_manager.create_channel(channel_name, max_size)
if channel2 is channel:
print(f"✅ 重复创建返回已存在的Channel")
else:
print(f"❌ 重复创建返回了不同的Channel")
return False
# 测试连接Channel
print(f"连接Channel: {channel_name}")
await channel.connect()
print(f"✅ Channel连接成功,状态: {channel.state.value}")
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_adapter_creation_logic():
"""测试适配器创建逻辑"""
print("\n=== 测试适配器创建逻辑 ===")
try:
# 创建客户端和Channel
client_name = "test_client_2"
channel_name = "test_channel_2"
print(f"创建客户端: {client_name}")
client = await websocket_manager.create_client(client_name, "wss://localhost:8080")
print(f"创建Channel: {channel_name}")
channel = await websocket_manager.create_channel(channel_name, 1000)
await channel.connect()
# 测试创建适配器
print(f"创建适配器: {client_name} -> {channel_name}")
adapter = await websocket_manager.create_adapter(client_name, channel_name)
if adapter:
print(f"✅ 适配器创建成功")
print(f" 客户端: {adapter.client_name}")
print(f" Channel: {adapter.channel_name}")
else:
print("❌ 适配器创建失败")
return False
# 测试重复创建(应该返回已存在的适配器)
print(f"重复创建适配器: {client_name} -> {channel_name}")
adapter2 = await websocket_manager.create_adapter(client_name, channel_name)
if adapter2 is adapter:
print(f"✅ 重复创建返回已存在的适配器")
else:
print(f"❌ 重复创建返回了不同的适配器")
return False
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_create_and_connect_client_simulation():
"""模拟create_and_connect_client函数的逻辑"""
print("\n=== 模拟create_and_connect_client逻辑 ===")
try:
# 模拟API请求
request_data = {
"name": "test_client_3",
"url": "wss://localhost:8080"
}
print(f"模拟API请求: {json.dumps(request_data, ensure_ascii=False)}")
# 创建请求对象
request = CreateWebSocketClientRequest(**request_data)
print(f"✅ 请求对象创建成功: {request.name} -> {request.url}")
# 模拟API逻辑(不实际连接)
client_name = request.name
client_url = request.url
# 1. 创建客户端
print(f"1. 创建客户端: {client_name}")
client = await websocket_manager.create_client(client_name, client_url)
if not client:
print("❌ 客户端创建失败")
return False
print(f"✅ 客户端创建成功")
# 2. 模拟连接客户端(不实际连接)
print(f"2. 模拟连接客户端: {client_name}")
# 注意:这里不实际调用connect(),因为会尝试连接不存在的服务器
print(f"✅ 客户端连接逻辑正确(跳过实际连接)")
# 3. 确保默认Channels存在并连接,创建适配器
print(f"3. 创建默认Channels和适配器")
default_channels = ["default", "system", "events"]
max_channel_size = 1000
for ch in default_channels:
print(f" 处理Channel: {ch}")
# 获取或创建Channel
channel = websocket_manager.get_channel(ch)
if not channel:
print(f" 创建Channel: {ch}")
channel = await websocket_manager.create_channel(ch, max_channel_size)
if channel:
print(f" 连接Channel: {ch}")
await channel.connect()
print(f" 创建适配器: {client_name} -> {ch}")
adapter = await websocket_manager.create_adapter(client_name, ch)
if adapter:
print(f" ✅ 适配器创建成功")
else:
print(f" ❌ 适配器创建失败")
else:
print(f" ❌ Channel创建失败")
print(f"✅ 默认Channels和适配器创建完成")
# 4. 验证结果
print(f"4. 验证结果")
client = websocket_manager.get_client(client_name)
if client:
print(f"✅ 客户端存在: {client.state.value}")
else:
print(f"❌ 客户端不存在")
return False
# 检查Channels
for ch in default_channels:
channel = websocket_manager.get_channel(ch)
if channel and channel.is_connected:
print(f"✅ Channel {ch} 状态正确: {channel.state.value}")
else:
print(f"❌ Channel {ch} 状态错误")
# 检查适配器
for ch in default_channels:
adapter = websocket_manager.get_adapter(client_name, ch)
if adapter:
print(f"✅ 适配器 {client_name} -> {ch} 存在")
else:
print(f"❌ 适配器 {client_name} -> {ch} 不存在")
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_error_handling():
"""测试错误处理"""
print("\n=== 测试错误处理 ===")
try:
# 测试断开不存在的客户端
non_existent_client = "non_existent_client"
print(f"测试断开不存在的客户端: {non_existent_client}")
success = await websocket_manager.disconnect_client(non_existent_client)
if not success:
print(f"✅ 正确拒绝断开不存在的客户端")
else:
print(f"❌ 应该拒绝断开不存在的客户端")
return False
# 测试获取不存在的客户端
client = websocket_manager.get_client(non_existent_client)
if client is None:
print(f"✅ 正确返回None对于不存在的客户端")
else:
print(f"❌ 应该返回None对于不存在的客户端")
return False
# 测试获取不存在的Channel
non_existent_channel = "non_existent_channel"
channel = websocket_manager.get_channel(non_existent_channel)
if channel is None:
print(f"✅ 正确返回None对于不存在的Channel")
else:
print(f"❌ 应该返回None对于不存在的Channel")
return False
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def main():
"""主测试函数"""
print("开始测试WebSocket API逻辑(不实际连接WebSocket服务器)...")
tests = [
test_request_validation,
test_client_creation_logic,
test_channel_creation_logic,
test_adapter_creation_logic,
test_create_and_connect_client_simulation,
test_error_handling,
]
results = []
for test in tests:
try:
result = await test()
results.append(result)
except Exception as e:
print(f"❌ 测试异常: {e}")
results.append(False)
# 输出测试结果
print("\n" + "="*50)
print("测试结果汇总:")
for i, result in enumerate(results):
status = "✅ 通过" if result else "❌ 失败"
print(f" 测试 {i+1}: {status}")
all_passed = all(results)
print(f"\n总体结果: {'✅ 所有测试通过' if all_passed else '❌ 部分测试失败'}")
# 清理资源
print("\n清理测试资源...")
await websocket_manager.cleanup()
print("✅ 资源清理完成")
return all_passed
if __name__ == "__main__":
try:
result = asyncio.run(main())
sys.exit(0 if result else 1)
except KeyboardInterrupt:
print("\n测试被用户中断")
sys.exit(1)
except Exception as e:
print(f"测试异常: {e}")
sys.exit(1)