You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
192 lines
7.1 KiB
192 lines
7.1 KiB
"""
|
|
WebSocket API集成测试
|
|
验证endpoints.create_and_connect_client是否能正常运行
|
|
"""
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
from app.core.websocket.manager import websocket_manager
|
|
from app.schemas.websocket import CreateWebSocketClientRequest, SuccessResponse
|
|
from app.utils.structured_log import get_structured_logger, LogLevel
|
|
|
|
logger = get_structured_logger(__name__, LogLevel.INFO)
|
|
|
|
async def test_create_and_connect_client():
|
|
"""测试创建并连接客户端功能"""
|
|
logger.info("开始测试WebSocket API集成")
|
|
|
|
try:
|
|
# 测试数据
|
|
test_name = "test_client_001"
|
|
test_url = "ws://localhost:8080/ws"
|
|
test_heartbeat_interval = 30
|
|
|
|
logger.info(f"测试参数: name={test_name}, url={test_url}, heartbeat_interval={test_heartbeat_interval}")
|
|
|
|
# 1. 测试创建客户端
|
|
logger.info("步骤1: 测试创建客户端")
|
|
client = await websocket_manager.create_client(test_name, test_url, test_heartbeat_interval)
|
|
|
|
if client:
|
|
logger.info(f"✅ 客户端创建成功: {client.name} -> {client.url}")
|
|
else:
|
|
logger.error("❌ 客户端创建失败")
|
|
return False
|
|
|
|
# 2. 检查Channel是否创建
|
|
logger.info("步骤2: 检查Channel是否创建")
|
|
channels = websocket_manager.get_client_channels(test_name)
|
|
expected_channels = [f"{test_name}_heartbeat", f"{test_name}_send", f"{test_name}_receive"]
|
|
|
|
for expected_channel in expected_channels:
|
|
if expected_channel in channels:
|
|
logger.info(f"✅ Channel创建成功: {expected_channel}")
|
|
else:
|
|
logger.error(f"❌ Channel创建失败: {expected_channel}")
|
|
return False
|
|
|
|
# 3. 测试连接客户端(模拟,因为需要真实的WebSocket服务器)
|
|
logger.info("步骤3: 测试连接客户端(模拟)")
|
|
try:
|
|
# 这里会失败,因为没有真实的WebSocket服务器,但可以测试代码逻辑
|
|
success = await websocket_manager.connect_client(test_name)
|
|
if success:
|
|
logger.info("✅ 客户端连接成功")
|
|
else:
|
|
logger.warning("⚠️ 客户端连接失败(预期,因为没有真实服务器)")
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ 客户端连接异常(预期): {e}")
|
|
|
|
# 4. 测试发送消息
|
|
logger.info("步骤4: 测试发送消息")
|
|
test_message = {"key": "value", "timestamp": datetime.now().isoformat()}
|
|
success = await websocket_manager.send_message(test_name, "test", test_message, priority=0)
|
|
|
|
if success:
|
|
logger.info("✅ 消息发送成功")
|
|
else:
|
|
logger.error("❌ 消息发送失败")
|
|
return False
|
|
|
|
# 5. 测试发送心跳
|
|
logger.info("步骤5: 测试发送心跳")
|
|
heartbeat_success = await websocket_manager.send_heartbeat(test_name)
|
|
|
|
if heartbeat_success:
|
|
logger.info("✅ 心跳发送成功")
|
|
else:
|
|
logger.error("❌ 心跳发送失败")
|
|
return False
|
|
|
|
# 6. 获取统计信息
|
|
logger.info("步骤6: 获取统计信息")
|
|
stats = websocket_manager.get_stats()
|
|
logger.info(f"✅ 统计信息获取成功: {json.dumps(stats, indent=2, default=str)}")
|
|
|
|
# 7. 测试断开客户端
|
|
logger.info("步骤7: 测试断开客户端")
|
|
disconnect_success = await websocket_manager.disconnect_client(test_name)
|
|
|
|
if disconnect_success:
|
|
logger.info("✅ 客户端断开成功")
|
|
else:
|
|
logger.warning("⚠️ 客户端断开失败")
|
|
|
|
# 8. 清理资源
|
|
logger.info("步骤8: 清理资源")
|
|
remove_success = await websocket_manager.remove_client(test_name)
|
|
|
|
if remove_success:
|
|
logger.info("✅ 客户端移除成功")
|
|
else:
|
|
logger.warning("⚠️ 客户端移除失败")
|
|
|
|
logger.info("🎉 WebSocket API集成测试完成")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ WebSocket API集成测试失败: {e}")
|
|
return False
|
|
|
|
async def test_api_endpoint_simulation():
|
|
"""模拟API端点的调用流程"""
|
|
logger.info("开始模拟API端点调用流程")
|
|
|
|
try:
|
|
# 模拟API请求数据
|
|
request_data = {
|
|
"name": "test_api_client",
|
|
"url": "ws://localhost:8080/ws",
|
|
"heartbeat_interval": 60
|
|
}
|
|
|
|
# 模拟CreateWebSocketClientRequest
|
|
request = CreateWebSocketClientRequest(**request_data)
|
|
logger.info(f"✅ 请求模型验证成功: {request}")
|
|
|
|
# 模拟API端点逻辑
|
|
client = await websocket_manager.create_client(request.name, request.url, request.heartbeat_interval)
|
|
|
|
if client:
|
|
success = await websocket_manager.connect_client(request.name)
|
|
|
|
if success:
|
|
# 模拟SuccessResponse
|
|
response_data = {
|
|
"name": request.name,
|
|
"url": request.url,
|
|
"status": "connected",
|
|
"heartbeat_interval": request.heartbeat_interval,
|
|
"channels": ["heartbeat", "send", "receive"]
|
|
}
|
|
|
|
response = SuccessResponse(
|
|
success=True,
|
|
message=f"WebSocket客户端 {request.name} 创建并连接成功",
|
|
data=response_data,
|
|
timestamp=datetime.now().isoformat()
|
|
)
|
|
|
|
logger.info(f"✅ API端点模拟成功: {response}")
|
|
|
|
# 清理
|
|
await websocket_manager.remove_client(request.name)
|
|
return True
|
|
else:
|
|
logger.error("❌ 客户端连接失败")
|
|
return False
|
|
else:
|
|
logger.error("❌ 客户端创建失败")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ API端点模拟失败: {e}")
|
|
return False
|
|
|
|
async def main():
|
|
"""主测试函数"""
|
|
logger.info("开始WebSocket API集成测试")
|
|
|
|
try:
|
|
# 测试1: 基本功能测试
|
|
test1_result = await test_create_and_connect_client()
|
|
|
|
# 测试2: API端点模拟测试
|
|
test2_result = await test_api_endpoint_simulation()
|
|
|
|
# 总结
|
|
if test1_result and test2_result:
|
|
logger.info("🎉 所有测试通过!endpoints.create_and_connect_client 可以正常运行")
|
|
else:
|
|
logger.error("❌ 部分测试失败,需要检查代码")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 测试执行失败: {e}")
|
|
|
|
finally:
|
|
# 清理所有资源
|
|
await websocket_manager.cleanup()
|
|
logger.info("测试完成,资源已清理")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|