You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
8.5 KiB

#!/usr/bin/env python3
"""
测试WebSocket心跳间隔参数化功能
验证API接口create_and_connect_client能够正确传递心跳间隔参数
"""
import asyncio
import sys
import os
import json
from datetime import datetime
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.core.websocket.manager import websocket_manager
from app.schemas.websocket import CreateWebSocketClientRequest
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.DEBUG)
async def test_heartbeat_interval_parameterization():
"""测试心跳间隔参数化功能"""
print("=== 测试WebSocket心跳间隔参数化功能 ===")
try:
# 测试1: 使用默认心跳间隔
print("\n1. 测试默认心跳间隔")
request_data_1 = {
"name": "test_client_default",
"url": "wss://localhost:8080"
}
print(f"请求数据: {json.dumps(request_data_1, ensure_ascii=False)}")
request_1 = CreateWebSocketClientRequest(**request_data_1)
print(f"✅ 请求对象创建成功: heartbeat_interval = {request_1.heartbeat_interval}")
# 模拟创建客户端和适配器
client_1 = await websocket_manager.create_client(request_1.name, request_1.url)
if client_1:
print(f"✅ 客户端创建成功: {client_1.name}")
# 创建Channel和适配器
channel_1 = await websocket_manager.create_channel("test_channel_1", 1000)
await channel_1.connect()
adapter_1 = await websocket_manager.create_adapter(request_1.name, "test_channel_1", request_1.heartbeat_interval)
if adapter_1:
print(f"✅ 适配器创建成功,心跳间隔: {adapter_1.heartbeat_interval}")
else:
print("❌ 适配器创建失败")
return False
else:
print("❌ 客户端创建失败")
return False
# 测试2: 使用自定义心跳间隔
print("\n2. 测试自定义心跳间隔")
request_data_2 = {
"name": "test_client_custom",
"url": "wss://localhost:8080",
"heartbeat_interval": 60
}
print(f"请求数据: {json.dumps(request_data_2, ensure_ascii=False)}")
request_2 = CreateWebSocketClientRequest(**request_data_2)
print(f"✅ 请求对象创建成功: heartbeat_interval = {request_2.heartbeat_interval}")
# 模拟创建客户端和适配器
client_2 = await websocket_manager.create_client(request_2.name, request_2.url)
if client_2:
print(f"✅ 客户端创建成功: {client_2.name}")
# 创建Channel和适配器
channel_2 = await websocket_manager.create_channel("test_channel_2", 1000)
await channel_2.connect()
adapter_2 = await websocket_manager.create_adapter(request_2.name, "test_channel_2", request_2.heartbeat_interval)
if adapter_2:
print(f"✅ 适配器创建成功,心跳间隔: {adapter_2.heartbeat_interval}")
else:
print("❌ 适配器创建失败")
return False
else:
print("❌ 客户端创建失败")
return False
# 测试3: 验证心跳间隔范围限制
print("\n3. 测试心跳间隔范围限制")
# 测试最小值
try:
request_data_min = {
"name": "test_client_min",
"url": "wss://localhost:8080",
"heartbeat_interval": 4 # 小于最小值5
}
request_min = CreateWebSocketClientRequest(**request_data_min)
print("❌ 最小值限制应该被拒绝")
return False
except ValueError as e:
print(f"✅ 最小值限制正确: {e}")
# 测试最大值
try:
request_data_max = {
"name": "test_client_max",
"url": "wss://localhost:8080",
"heartbeat_interval": 301 # 大于最大值300
}
request_max = CreateWebSocketClientRequest(**request_data_max)
print("❌ 最大值限制应该被拒绝")
return False
except ValueError as e:
print(f"✅ 最大值限制正确: {e}")
# 测试边界值
try:
request_data_boundary = {
"name": "test_client_boundary",
"url": "wss://localhost:8080",
"heartbeat_interval": 5 # 最小值
}
request_boundary = CreateWebSocketClientRequest(**request_data_boundary)
print(f"✅ 边界值测试通过: heartbeat_interval = {request_boundary.heartbeat_interval}")
except ValueError as e:
print(f"❌ 边界值测试失败: {e}")
return False
# 测试4: 模拟API接口响应
print("\n4. 测试API接口响应")
# 模拟API响应数据
api_response_data = {
"name": "test_client_api",
"url": "wss://localhost:8080",
"status": "connected",
"heartbeat_interval": 45
}
print(f"API响应数据: {json.dumps(api_response_data, ensure_ascii=False)}")
print(f"✅ API响应包含心跳间隔信息: {api_response_data['heartbeat_interval']}")
# 验证响应数据格式
if "heartbeat_interval" in api_response_data:
print("✅ 响应数据格式正确")
else:
print("❌ 响应数据格式错误")
return False
print("\n=== 所有测试通过 ===")
return True
except Exception as e:
print(f"❌ 测试异常: {e}")
return False
async def test_adapter_heartbeat_loop():
"""测试适配器心跳循环功能"""
print("\n=== 测试适配器心跳循环功能 ===")
try:
# 创建测试客户端和Channel
client_name = "test_heartbeat_client"
channel_name = "test_heartbeat_channel"
client = await websocket_manager.create_client(client_name, "wss://localhost:8080")
channel = await websocket_manager.create_channel(channel_name, 1000)
await channel.connect()
# 测试不同心跳间隔的适配器
test_intervals = [15, 30, 60]
for interval in test_intervals:
print(f"\n测试心跳间隔: {interval}")
adapter = await websocket_manager.create_adapter(client_name, channel_name, interval)
if adapter:
print(f"✅ 适配器创建成功")
print(f" 心跳间隔: {adapter.heartbeat_interval}")
print(f" 心跳间隔设置正确: {adapter.heartbeat_interval == interval}")
# 验证心跳间隔属性
if adapter.heartbeat_interval == interval:
print(f"✅ 心跳间隔设置正确")
else:
print(f"❌ 心跳间隔设置错误: 期望{interval},实际{adapter.heartbeat_interval}")
return False
else:
print(f"❌ 适配器创建失败")
return False
print("\n=== 心跳循环测试通过 ===")
return True
except Exception as e:
print(f"❌ 心跳循环测试异常: {e}")
return False
async def main():
"""主测试函数"""
print("开始测试WebSocket心跳间隔参数化功能")
print("=" * 50)
try:
# 测试心跳间隔参数化
result_1 = await test_heartbeat_interval_parameterization()
if not result_1:
print("❌ 心跳间隔参数化测试失败")
return False
# 测试适配器心跳循环
result_2 = await test_adapter_heartbeat_loop()
if not result_2:
print("❌ 适配器心跳循环测试失败")
return False
print("\n" + "=" * 50)
print("🎉 所有测试通过!WebSocket心跳间隔参数化功能正常工作")
return True
except Exception as e:
print(f"❌ 测试过程中发生异常: {e}")
return False
finally:
# 清理测试资源
print("\n清理测试资源...")
await websocket_manager.disconnect_all_clients()
if __name__ == "__main__":
asyncio.run(main())