You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
5.3 KiB

"""
测试WebSocket的stop/start逻辑
"""
import asyncio
import json
from app.core.websocket.manager import websocket_manager
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.INFO)
async def test_stop_start_logic():
"""测试stop/start逻辑"""
logger.info("开始测试WebSocket stop/start逻辑")
try:
# 1. 创建客户端
logger.info("1. 创建客户端")
client = await websocket_manager.create_client("test_stop_start", "ws://localhost:8080")
logger.info(f"客户端创建成功: {client.name}")
# 2. 检查初始状态
logger.info("2. 检查初始状态")
stats = websocket_manager.get_stats()
logger.info(f"客户端数量: {stats['client_count']}")
logger.info(f"Channel数量: {stats['channel_count']}")
logger.info(f"适配器数量: {stats['adapter_count']}")
# 3. 连接客户端
logger.info("3. 连接客户端")
success = await websocket_manager.connect_client("test_stop_start")
logger.info(f"客户端连接成功: {success}")
# 4. 发送测试消息
logger.info("4. 发送测试消息")
await websocket_manager.send_message("test_stop_start", "test", {"message": "hello"})
await websocket_manager.send_heartbeat("test_stop_start")
# 5. 检查连接后状态
logger.info("5. 检查连接后状态")
stats = websocket_manager.get_stats()
logger.info(f"客户端数量: {stats['client_count']}")
logger.info(f"Channel数量: {stats['channel_count']}")
logger.info(f"适配器数量: {stats['adapter_count']}")
# 6. 断开客户端
logger.info("6. 断开客户端")
success = await websocket_manager.disconnect_client("test_stop_start")
logger.info(f"客户端断开成功: {success}")
# 7. 检查断开后状态
logger.info("7. 检查断开后状态")
stats = websocket_manager.get_stats()
logger.info(f"客户端数量: {stats['client_count']}")
logger.info(f"Channel数量: {stats['channel_count']}")
logger.info(f"适配器数量: {stats['adapter_count']}")
# 8. 重新连接客户端
logger.info("8. 重新连接客户端")
success = await websocket_manager.connect_client("test_stop_start")
logger.info(f"客户端重新连接成功: {success}")
# 9. 再次发送消息
logger.info("9. 再次发送消息")
await websocket_manager.send_message("test_stop_start", "test", {"message": "hello again"})
await websocket_manager.send_heartbeat("test_stop_start")
# 10. 移除客户端
logger.info("10. 移除客户端")
success = await websocket_manager.remove_client("test_stop_start")
logger.info(f"客户端移除成功: {success}")
# 11. 检查最终状态
logger.info("11. 检查最终状态")
stats = websocket_manager.get_stats()
logger.info(f"客户端数量: {stats['client_count']}")
logger.info(f"Channel数量: {stats['channel_count']}")
logger.info(f"适配器数量: {stats['adapter_count']}")
logger.info("✅ WebSocket stop/start逻辑测试完成")
except Exception as e:
logger.error(f"❌ WebSocket stop/start逻辑测试失败: {e}")
raise
async def test_cleanup_logic():
"""测试cleanup逻辑"""
logger.info("开始测试WebSocket cleanup逻辑")
try:
# 1. 创建多个客户端
logger.info("1. 创建多个客户端")
clients = []
for i in range(3):
client = await websocket_manager.create_client(f"test_cleanup_{i}", "ws://localhost:8080")
clients.append(client)
await websocket_manager.connect_client(f"test_cleanup_{i}")
# 2. 检查创建后状态
logger.info("2. 检查创建后状态")
stats = websocket_manager.get_stats()
logger.info(f"客户端数量: {stats['client_count']}")
logger.info(f"Channel数量: {stats['channel_count']}")
logger.info(f"适配器数量: {stats['adapter_count']}")
# 3. 执行cleanup
logger.info("3. 执行cleanup")
await websocket_manager.cleanup()
# 4. 检查cleanup后状态
logger.info("4. 检查cleanup后状态")
stats = websocket_manager.get_stats()
logger.info(f"客户端数量: {stats['client_count']}")
logger.info(f"Channel数量: {stats['channel_count']}")
logger.info(f"适配器数量: {stats['adapter_count']}")
logger.info("✅ WebSocket cleanup逻辑测试完成")
except Exception as e:
logger.error(f"❌ WebSocket cleanup逻辑测试失败: {e}")
raise
async def main():
"""主测试函数"""
logger.info("开始WebSocket stop/start测试")
try:
# 测试stop/start逻辑
await test_stop_start_logic()
# 测试cleanup逻辑
await test_cleanup_logic()
logger.info("🎉 所有测试完成")
except Exception as e:
logger.error(f"❌ 测试失败: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())