You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
230 lines
8.5 KiB
230 lines
8.5 KiB
|
4 months ago
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
测试WebSocket心跳间隔参数化功能
|
||
|
|
验证API接口create_and_connect_client能够正确传递心跳间隔参数
|
||
|
|
"""
|
||
|
|
import asyncio
|
||
|
|
import sys
|
||
|
|
import os
|
||
|
|
import json
|
||
|
|
from datetime import datetime
|
||
|
|
|
||
|
|
# 添加项目路径
|
||
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
|
|
||
|
|
from app.core.websocket.manager import websocket_manager
|
||
|
|
from app.schemas.websocket import CreateWebSocketClientRequest
|
||
|
4 months ago
|
from app.utils.structured_log import get_structured_logger, LogLevel
|
||
|
4 months ago
|
|
||
|
4 months ago
|
logger = get_structured_logger(__name__, LogLevel.DEBUG)
|
||
|
4 months ago
|
|
||
|
|
async def test_heartbeat_interval_parameterization():
|
||
|
|
"""测试心跳间隔参数化功能"""
|
||
|
|
print("=== 测试WebSocket心跳间隔参数化功能 ===")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 测试1: 使用默认心跳间隔
|
||
|
|
print("\n1. 测试默认心跳间隔")
|
||
|
|
request_data_1 = {
|
||
|
|
"name": "test_client_default",
|
||
|
|
"url": "wss://localhost:8080"
|
||
|
|
}
|
||
|
|
|
||
|
|
print(f"请求数据: {json.dumps(request_data_1, ensure_ascii=False)}")
|
||
|
|
request_1 = CreateWebSocketClientRequest(**request_data_1)
|
||
|
|
print(f"✅ 请求对象创建成功: heartbeat_interval = {request_1.heartbeat_interval}")
|
||
|
|
|
||
|
|
# 模拟创建客户端和适配器
|
||
|
|
client_1 = await websocket_manager.create_client(request_1.name, request_1.url)
|
||
|
|
if client_1:
|
||
|
|
print(f"✅ 客户端创建成功: {client_1.name}")
|
||
|
|
|
||
|
|
# 创建Channel和适配器
|
||
|
|
channel_1 = await websocket_manager.create_channel("test_channel_1", 1000)
|
||
|
|
await channel_1.connect()
|
||
|
|
adapter_1 = await websocket_manager.create_adapter(request_1.name, "test_channel_1", request_1.heartbeat_interval)
|
||
|
|
|
||
|
|
if adapter_1:
|
||
|
|
print(f"✅ 适配器创建成功,心跳间隔: {adapter_1.heartbeat_interval}秒")
|
||
|
|
else:
|
||
|
|
print("❌ 适配器创建失败")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
print("❌ 客户端创建失败")
|
||
|
|
return False
|
||
|
|
|
||
|
|
# 测试2: 使用自定义心跳间隔
|
||
|
|
print("\n2. 测试自定义心跳间隔")
|
||
|
|
request_data_2 = {
|
||
|
|
"name": "test_client_custom",
|
||
|
|
"url": "wss://localhost:8080",
|
||
|
|
"heartbeat_interval": 60
|
||
|
|
}
|
||
|
|
|
||
|
|
print(f"请求数据: {json.dumps(request_data_2, ensure_ascii=False)}")
|
||
|
|
request_2 = CreateWebSocketClientRequest(**request_data_2)
|
||
|
|
print(f"✅ 请求对象创建成功: heartbeat_interval = {request_2.heartbeat_interval}")
|
||
|
|
|
||
|
|
# 模拟创建客户端和适配器
|
||
|
|
client_2 = await websocket_manager.create_client(request_2.name, request_2.url)
|
||
|
|
if client_2:
|
||
|
|
print(f"✅ 客户端创建成功: {client_2.name}")
|
||
|
|
|
||
|
|
# 创建Channel和适配器
|
||
|
|
channel_2 = await websocket_manager.create_channel("test_channel_2", 1000)
|
||
|
|
await channel_2.connect()
|
||
|
|
adapter_2 = await websocket_manager.create_adapter(request_2.name, "test_channel_2", request_2.heartbeat_interval)
|
||
|
|
|
||
|
|
if adapter_2:
|
||
|
|
print(f"✅ 适配器创建成功,心跳间隔: {adapter_2.heartbeat_interval}秒")
|
||
|
|
else:
|
||
|
|
print("❌ 适配器创建失败")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
print("❌ 客户端创建失败")
|
||
|
|
return False
|
||
|
|
|
||
|
|
# 测试3: 验证心跳间隔范围限制
|
||
|
|
print("\n3. 测试心跳间隔范围限制")
|
||
|
|
|
||
|
|
# 测试最小值
|
||
|
|
try:
|
||
|
|
request_data_min = {
|
||
|
|
"name": "test_client_min",
|
||
|
|
"url": "wss://localhost:8080",
|
||
|
|
"heartbeat_interval": 4 # 小于最小值5
|
||
|
|
}
|
||
|
|
request_min = CreateWebSocketClientRequest(**request_data_min)
|
||
|
|
print("❌ 最小值限制应该被拒绝")
|
||
|
|
return False
|
||
|
|
except ValueError as e:
|
||
|
|
print(f"✅ 最小值限制正确: {e}")
|
||
|
|
|
||
|
|
# 测试最大值
|
||
|
|
try:
|
||
|
|
request_data_max = {
|
||
|
|
"name": "test_client_max",
|
||
|
|
"url": "wss://localhost:8080",
|
||
|
|
"heartbeat_interval": 301 # 大于最大值300
|
||
|
|
}
|
||
|
|
request_max = CreateWebSocketClientRequest(**request_data_max)
|
||
|
|
print("❌ 最大值限制应该被拒绝")
|
||
|
|
return False
|
||
|
|
except ValueError as e:
|
||
|
|
print(f"✅ 最大值限制正确: {e}")
|
||
|
|
|
||
|
|
# 测试边界值
|
||
|
|
try:
|
||
|
|
request_data_boundary = {
|
||
|
|
"name": "test_client_boundary",
|
||
|
|
"url": "wss://localhost:8080",
|
||
|
|
"heartbeat_interval": 5 # 最小值
|
||
|
|
}
|
||
|
|
request_boundary = CreateWebSocketClientRequest(**request_data_boundary)
|
||
|
|
print(f"✅ 边界值测试通过: heartbeat_interval = {request_boundary.heartbeat_interval}")
|
||
|
|
except ValueError as e:
|
||
|
|
print(f"❌ 边界值测试失败: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
# 测试4: 模拟API接口响应
|
||
|
|
print("\n4. 测试API接口响应")
|
||
|
|
|
||
|
|
# 模拟API响应数据
|
||
|
|
api_response_data = {
|
||
|
|
"name": "test_client_api",
|
||
|
|
"url": "wss://localhost:8080",
|
||
|
|
"status": "connected",
|
||
|
|
"heartbeat_interval": 45
|
||
|
|
}
|
||
|
|
|
||
|
|
print(f"API响应数据: {json.dumps(api_response_data, ensure_ascii=False)}")
|
||
|
|
print(f"✅ API响应包含心跳间隔信息: {api_response_data['heartbeat_interval']}秒")
|
||
|
|
|
||
|
|
# 验证响应数据格式
|
||
|
|
if "heartbeat_interval" in api_response_data:
|
||
|
|
print("✅ 响应数据格式正确")
|
||
|
|
else:
|
||
|
|
print("❌ 响应数据格式错误")
|
||
|
|
return False
|
||
|
|
|
||
|
|
print("\n=== 所有测试通过 ===")
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ 测试异常: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
async def test_adapter_heartbeat_loop():
|
||
|
|
"""测试适配器心跳循环功能"""
|
||
|
|
print("\n=== 测试适配器心跳循环功能 ===")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 创建测试客户端和Channel
|
||
|
|
client_name = "test_heartbeat_client"
|
||
|
|
channel_name = "test_heartbeat_channel"
|
||
|
|
|
||
|
|
client = await websocket_manager.create_client(client_name, "wss://localhost:8080")
|
||
|
|
channel = await websocket_manager.create_channel(channel_name, 1000)
|
||
|
|
await channel.connect()
|
||
|
|
|
||
|
|
# 测试不同心跳间隔的适配器
|
||
|
|
test_intervals = [15, 30, 60]
|
||
|
|
|
||
|
|
for interval in test_intervals:
|
||
|
|
print(f"\n测试心跳间隔: {interval}秒")
|
||
|
|
|
||
|
|
adapter = await websocket_manager.create_adapter(client_name, channel_name, interval)
|
||
|
|
if adapter:
|
||
|
|
print(f"✅ 适配器创建成功")
|
||
|
|
print(f" 心跳间隔: {adapter.heartbeat_interval}秒")
|
||
|
|
print(f" 心跳间隔设置正确: {adapter.heartbeat_interval == interval}")
|
||
|
|
|
||
|
|
# 验证心跳间隔属性
|
||
|
|
if adapter.heartbeat_interval == interval:
|
||
|
|
print(f"✅ 心跳间隔设置正确")
|
||
|
|
else:
|
||
|
|
print(f"❌ 心跳间隔设置错误: 期望{interval},实际{adapter.heartbeat_interval}")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
print(f"❌ 适配器创建失败")
|
||
|
|
return False
|
||
|
|
|
||
|
|
print("\n=== 心跳循环测试通过 ===")
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ 心跳循环测试异常: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
async def main():
|
||
|
|
"""主测试函数"""
|
||
|
|
print("开始测试WebSocket心跳间隔参数化功能")
|
||
|
|
print("=" * 50)
|
||
|
|
|
||
|
|
try:
|
||
|
|
# 测试心跳间隔参数化
|
||
|
|
result_1 = await test_heartbeat_interval_parameterization()
|
||
|
|
if not result_1:
|
||
|
|
print("❌ 心跳间隔参数化测试失败")
|
||
|
|
return False
|
||
|
|
|
||
|
|
# 测试适配器心跳循环
|
||
|
|
result_2 = await test_adapter_heartbeat_loop()
|
||
|
|
if not result_2:
|
||
|
|
print("❌ 适配器心跳循环测试失败")
|
||
|
|
return False
|
||
|
|
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
print("🎉 所有测试通过!WebSocket心跳间隔参数化功能正常工作")
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ 测试过程中发生异常: {e}")
|
||
|
|
return False
|
||
|
|
finally:
|
||
|
|
# 清理测试资源
|
||
|
|
print("\n清理测试资源...")
|
||
|
|
await websocket_manager.disconnect_all_clients()
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
asyncio.run(main())
|