#!/usr/bin/env python3 """ 心跳消息队列大小检查修复验证测试 验证心跳消息生成后队列大小检查是否正确 """ import asyncio import sys import os from datetime import datetime # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.websocket.manager import websocket_manager from app.core.websocket.channel import WebSocketChannel, ChannelMessage from app.core.websocket.send_controller import WebSocketSendController from app.core.websocket.client import WebSocketClient from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) class MockWebSocketClient(WebSocketClient): """模拟WebSocket客户端,用于测试""" def __init__(self, name: str, url: str, heartbeat_interval: int = 120): super().__init__(name, url) self.heartbeat_interval = heartbeat_interval self._state = self._state.__class__.CONNECTED # 模拟已连接状态 async def send_raw(self, data: str) -> bool: """模拟发送数据""" logger.debug(f"模拟发送数据: {self.name} -> {data[:100]}...") return True async def test_heartbeat_queue_size_check(): """测试心跳消息队列大小检查""" print("=== 心跳消息队列大小检查测试 ===") try: # 创建模拟客户端 client_name = "test_heartbeat_queue" client_url = "wss://localhost:8080" heartbeat_interval = 3 # 3秒心跳间隔,便于测试 print(f"创建模拟客户端: {client_name}") print(f"心跳间隔: {heartbeat_interval}秒") # 创建模拟客户端 client = MockWebSocketClient(client_name, client_url, heartbeat_interval) print(f"✅ 模拟客户端创建成功: {client.name}") # 创建Channel heartbeat_channel = WebSocketChannel(f"{client_name}_heartbeat") send_channel = WebSocketChannel(f"{client_name}_send") # 连接Channel await heartbeat_channel.connect() await send_channel.connect() print(f"✅ Channel连接成功") # 创建发送控制器 send_controller = WebSocketSendController(client, heartbeat_channel, send_channel) print(f"✅ 发送控制器创建成功") # 启动发送控制器 success = await send_controller.start() if not success: print("❌ 发送控制器启动失败") return False print(f"✅ 发送控制器启动成功") # 等待心跳生成 print(f"等待 {heartbeat_interval + 1} 秒观察心跳消息生成...") await asyncio.sleep(heartbeat_interval + 1) # 检查心跳Channel的队列状态 heartbeat_stats = heartbeat_channel.get_stats() print(f"心跳Channel统计信息:") print(f" 队列大小: {heartbeat_stats['queue_size']}") print(f" 优先级队列大小: {heartbeat_stats['priority_queue_size']}") print(f" 总队列大小: {heartbeat_stats['total_queue_size']}") print(f" 消息总数: {heartbeat_stats['message_count']}") # 检查发送控制器状态 controller_stats = send_controller.get_stats() print(f"发送控制器统计信息:") print(f" 运行状态: {controller_stats['is_running']}") print(f" 心跳任务运行: {controller_stats['heartbeat_task_running']}") print(f" 发送任务运行: {controller_stats['send_task_running']}") # 等待一段时间观察消息发送 print(f"等待 5 秒观察消息发送...") await asyncio.sleep(5) # 再次检查队列状态 heartbeat_stats_after = heartbeat_channel.get_stats() print(f"发送后心跳Channel统计信息:") print(f" 队列大小: {heartbeat_stats_after['queue_size']}") print(f" 优先级队列大小: {heartbeat_stats_after['priority_queue_size']}") print(f" 总队列大小: {heartbeat_stats_after['total_queue_size']}") print(f" 消息总数: {heartbeat_stats_after['message_count']}") # 停止发送控制器 await send_controller.stop() print(f"✅ 发送控制器停止成功") # 断开Channel await heartbeat_channel.disconnect() await send_channel.disconnect() print(f"✅ Channel断开成功") return True except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() return False async def test_manual_heartbeat_message(): """测试手动创建心跳消息""" print("\n=== 手动心跳消息测试 ===") try: # 创建Channel channel_name = "test_manual_heartbeat" channel = WebSocketChannel(channel_name) # 连接Channel await channel.connect() print(f"✅ Channel连接成功: {channel.name}") # 创建心跳消息 heartbeat_message = ChannelMessage( type="heartbeat", data={ "Payload": { "Message": "ping" } }, priority=1 # 高优先级 ) print(f"✅ 心跳消息创建成功: {heartbeat_message.type}") # 发送消息到Channel success = await channel.send_message(heartbeat_message) if success: print(f"✅ 心跳消息发送到Channel成功") else: print(f"❌ 心跳消息发送到Channel失败") return False # 检查队列状态 stats = channel.get_stats() print(f"Channel统计信息:") print(f" 队列大小: {stats['queue_size']}") print(f" 优先级队列大小: {stats['priority_queue_size']}") print(f" 总队列大小: {stats['total_queue_size']}") print(f" 消息总数: {stats['message_count']}") # 接收消息 received_message = await channel.receive_message() if received_message: print(f"✅ 消息接收成功: {received_message.type}") print(f" 优先级: {received_message.priority}") print(f" 数据: {received_message.data}") else: print(f"❌ 消息接收失败") return False # 再次检查队列状态 stats_after = channel.get_stats() print(f"接收后Channel统计信息:") print(f" 队列大小: {stats_after['queue_size']}") print(f" 优先级队列大小: {stats_after['priority_queue_size']}") print(f" 总队列大小: {stats_after['total_queue_size']}") # 断开Channel await channel.disconnect() print(f"✅ Channel断开成功") return True except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() return False async def main(): """主测试函数""" print("开始心跳消息队列大小检查修复验证测试") print("=" * 60) # 测试1:心跳消息队列大小检查 success1 = await test_heartbeat_queue_size_check() # 测试2:手动心跳消息测试 success2 = await test_manual_heartbeat_message() print("\n" + "=" * 60) print("测试结果汇总:") print(f"心跳消息队列大小检查测试: {'✅ 通过' if success1 else '❌ 失败'}") print(f"手动心跳消息测试: {'✅ 通过' if success2 else '❌ 失败'}") if success1 and success2: print("🎉 所有测试通过!心跳消息队列大小检查修复成功!") else: print("⚠️ 部分测试失败,请检查日志") if __name__ == "__main__": asyncio.run(main())