You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
7.1 KiB

"""
WebSocket API集成测试
验证endpoints.create_and_connect_client是否能正常运行
"""
import asyncio
import json
from datetime import datetime
from app.core.websocket.manager import websocket_manager
from app.schemas.websocket import CreateWebSocketClientRequest, SuccessResponse
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.INFO)
async def test_create_and_connect_client():
"""测试创建并连接客户端功能"""
logger.info("开始测试WebSocket API集成")
try:
# 测试数据
test_name = "test_client_001"
test_url = "ws://localhost:8080/ws"
test_heartbeat_interval = 30
logger.info(f"测试参数: name={test_name}, url={test_url}, heartbeat_interval={test_heartbeat_interval}")
# 1. 测试创建客户端
logger.info("步骤1: 测试创建客户端")
client = await websocket_manager.create_client(test_name, test_url, test_heartbeat_interval)
if client:
logger.info(f"✅ 客户端创建成功: {client.name} -> {client.url}")
else:
logger.error("❌ 客户端创建失败")
return False
# 2. 检查Channel是否创建
logger.info("步骤2: 检查Channel是否创建")
channels = websocket_manager.get_client_channels(test_name)
expected_channels = [f"{test_name}_heartbeat", f"{test_name}_send", f"{test_name}_receive"]
for expected_channel in expected_channels:
if expected_channel in channels:
logger.info(f"✅ Channel创建成功: {expected_channel}")
else:
logger.error(f"❌ Channel创建失败: {expected_channel}")
return False
# 3. 测试连接客户端(模拟,因为需要真实的WebSocket服务器)
logger.info("步骤3: 测试连接客户端(模拟)")
try:
# 这里会失败,因为没有真实的WebSocket服务器,但可以测试代码逻辑
success = await websocket_manager.connect_client(test_name)
if success:
logger.info("✅ 客户端连接成功")
else:
logger.warning("⚠️ 客户端连接失败(预期,因为没有真实服务器)")
except Exception as e:
logger.warning(f"⚠️ 客户端连接异常(预期): {e}")
# 4. 测试发送消息
logger.info("步骤4: 测试发送消息")
test_message = {"key": "value", "timestamp": datetime.now().isoformat()}
success = await websocket_manager.send_message(test_name, "test", test_message, priority=0)
if success:
logger.info("✅ 消息发送成功")
else:
logger.error("❌ 消息发送失败")
return False
# 5. 测试发送心跳
logger.info("步骤5: 测试发送心跳")
heartbeat_success = await websocket_manager.send_heartbeat(test_name)
if heartbeat_success:
logger.info("✅ 心跳发送成功")
else:
logger.error("❌ 心跳发送失败")
return False
# 6. 获取统计信息
logger.info("步骤6: 获取统计信息")
stats = websocket_manager.get_stats()
logger.info(f"✅ 统计信息获取成功: {json.dumps(stats, indent=2, default=str)}")
# 7. 测试断开客户端
logger.info("步骤7: 测试断开客户端")
disconnect_success = await websocket_manager.disconnect_client(test_name)
if disconnect_success:
logger.info("✅ 客户端断开成功")
else:
logger.warning("⚠️ 客户端断开失败")
# 8. 清理资源
logger.info("步骤8: 清理资源")
remove_success = await websocket_manager.remove_client(test_name)
if remove_success:
logger.info("✅ 客户端移除成功")
else:
logger.warning("⚠️ 客户端移除失败")
logger.info("🎉 WebSocket API集成测试完成")
return True
except Exception as e:
logger.error(f"❌ WebSocket API集成测试失败: {e}")
return False
async def test_api_endpoint_simulation():
"""模拟API端点的调用流程"""
logger.info("开始模拟API端点调用流程")
try:
# 模拟API请求数据
request_data = {
"name": "test_api_client",
"url": "ws://localhost:8080/ws",
"heartbeat_interval": 60
}
# 模拟CreateWebSocketClientRequest
request = CreateWebSocketClientRequest(**request_data)
logger.info(f"✅ 请求模型验证成功: {request}")
# 模拟API端点逻辑
client = await websocket_manager.create_client(request.name, request.url, request.heartbeat_interval)
if client:
success = await websocket_manager.connect_client(request.name)
if success:
# 模拟SuccessResponse
response_data = {
"name": request.name,
"url": request.url,
"status": "connected",
"heartbeat_interval": request.heartbeat_interval,
"channels": ["heartbeat", "send", "receive"]
}
response = SuccessResponse(
success=True,
message=f"WebSocket客户端 {request.name} 创建并连接成功",
data=response_data,
timestamp=datetime.now().isoformat()
)
logger.info(f"✅ API端点模拟成功: {response}")
# 清理
await websocket_manager.remove_client(request.name)
return True
else:
logger.error("❌ 客户端连接失败")
return False
else:
logger.error("❌ 客户端创建失败")
return False
except Exception as e:
logger.error(f"❌ API端点模拟失败: {e}")
return False
async def main():
"""主测试函数"""
logger.info("开始WebSocket API集成测试")
try:
# 测试1: 基本功能测试
test1_result = await test_create_and_connect_client()
# 测试2: API端点模拟测试
test2_result = await test_api_endpoint_simulation()
# 总结
if test1_result and test2_result:
logger.info("🎉 所有测试通过!endpoints.create_and_connect_client 可以正常运行")
else:
logger.error("❌ 部分测试失败,需要检查代码")
except Exception as e:
logger.error(f"❌ 测试执行失败: {e}")
finally:
# 清理所有资源
await websocket_manager.cleanup()
logger.info("测试完成,资源已清理")
if __name__ == "__main__":
asyncio.run(main())