#!/usr/bin/env python3 """ 测试WebSocket心跳任务启动功能 验证心跳任务是否正确启动并发送心跳消息 """ import asyncio import sys import os import json from datetime import datetime # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.websocket.manager import websocket_manager from app.schemas.websocket import CreateWebSocketClientRequest from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) async def test_heartbeat_task_startup(): """测试心跳任务启动""" print("=== 测试心跳任务启动 ===") try: # 测试创建客户端(不实际连接) client_name = "test_heartbeat_client" client_url = "wss://localhost:8080" heartbeat_interval = 10 # 10秒心跳间隔,便于测试 print(f"创建客户端: {client_name} -> {client_url}") print(f"心跳间隔: {heartbeat_interval}秒") # 创建客户端 client = await websocket_manager.create_client(client_name, client_url, heartbeat_interval) if client: print(f"✅ 客户端创建成功: {client.name}") print(f" 状态: {client.state.value}") print(f" 连接状态: {client.is_connected}") else: print("❌ 客户端创建失败") return False # 检查心跳任务是否存在 if client_name in websocket_manager._heartbeat_tasks: heartbeat_task = websocket_manager._heartbeat_tasks[client_name] print(f"✅ 心跳任务存在: {heartbeat_task}") print(f" 任务状态: {heartbeat_task.done()}") print(f" 任务取消状态: {heartbeat_task.cancelled()}") else: print("❌ 心跳任务不存在") return False # 检查心跳Channel是否存在 heartbeat_channel_name = f"{client_name}_heartbeat" heartbeat_channel = websocket_manager.get_channel(heartbeat_channel_name) if heartbeat_channel: print(f"✅ 心跳Channel存在: {heartbeat_channel.name}") print(f" 连接状态: {heartbeat_channel.is_connected}") else: print(f"❌ 心跳Channel不存在: {heartbeat_channel_name}") return False # 等待一段时间,观察心跳消息 print(f"等待 {heartbeat_interval + 2} 秒观察心跳消息...") await asyncio.sleep(heartbeat_interval + 2) # 检查心跳任务是否仍在运行 if client_name in websocket_manager._heartbeat_tasks: heartbeat_task = websocket_manager._heartbeat_tasks[client_name] if not heartbeat_task.done(): print("✅ 心跳任务仍在运行") else: print("❌ 心跳任务已结束") if heartbeat_task.exception(): print(f" 异常: {heartbeat_task.exception()}") else: print("❌ 心跳任务不存在") # 清理资源 print("清理资源...") await websocket_manager.remove_client(client_name) print("✅ 资源清理完成") return True except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() return False async def test_heartbeat_with_connection(): """测试带连接的心跳功能""" print("\n=== 测试带连接的心跳功能 ===") try: # 测试创建并连接客户端 client_name = "test_heartbeat_connected" client_url = "wss://192.168.13.15:7268/ws" # 使用实际的WebSocket服务器 heartbeat_interval = 15 # 15秒心跳间隔 print(f"创建并连接客户端: {client_name} -> {client_url}") print(f"心跳间隔: {heartbeat_interval}秒") # 创建客户端 client = await websocket_manager.create_client(client_name, client_url, heartbeat_interval) if not client: print("❌ 客户端创建失败") return False # 连接客户端 success = await websocket_manager.connect_client(client_name) if not success: print("❌ 客户端连接失败") return False print("✅ 客户端连接成功") # 等待一段时间观察心跳 print(f"等待 {heartbeat_interval + 5} 秒观察心跳...") await asyncio.sleep(heartbeat_interval + 5) # 检查客户端状态 client = websocket_manager.get_client(client_name) if client and client.is_connected: print("✅ 客户端仍保持连接") else: print("❌ 客户端连接已断开") # 清理资源 print("清理资源...") await websocket_manager.remove_client(client_name) print("✅ 资源清理完成") return True except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() return False async def main(): """主测试函数""" print("开始测试WebSocket心跳任务启动功能") print("=" * 50) # 测试1:心跳任务启动 success1 = await test_heartbeat_task_startup() # 测试2:带连接的心跳功能 success2 = await test_heartbeat_with_connection() print("\n" + "=" * 50) print("测试结果汇总:") print(f"心跳任务启动测试: {'✅ 通过' if success1 else '❌ 失败'}") print(f"带连接的心跳测试: {'✅ 通过' if success2 else '❌ 失败'}") if success1 and success2: print("🎉 所有测试通过!") else: print("⚠️ 部分测试失败,请检查日志") if __name__ == "__main__": asyncio.run(main())