""" 测试消息传递流程:manager -> channel_manager -> channel -> send_controller """ import asyncio import sys import os # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.websocket.manager import WebSocketManager from app.core.websocket.channel import WebSocketChannel, ChannelMessage from app.core.websocket.client import WebSocketClient from app.core.websocket.send_controller import WebSocketSendController from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) class MockWebSocketClient(WebSocketClient): """模拟WebSocket客户端""" def __init__(self, name: str, url: str, heartbeat_interval: int = 120): super().__init__(name, url) self.heartbeat_interval = heartbeat_interval self._connected = False self._sent_messages = [] async def connect(self) -> bool: self._connected = True logger.info(f"MockWebSocketClient 连接成功: {self.name}") return True async def disconnect(self) -> bool: self._connected = False logger.info(f"MockWebSocketClient 断开连接: {self.name}") return True @property def is_connected(self) -> bool: return self._connected async def send_raw(self, data: bytes) -> bool: self._sent_messages.append(data) logger.info(f"MockWebSocketClient 发送消息: {self.name} -> {data}") return True async def test_message_flow(): """测试消息传递流程""" print("=== 测试消息传递流程 ===") try: # 1. 创建WebSocket管理器 manager = WebSocketManager() # 2. 创建客户端 client_name = "test_message_flow" client_url = "ws://localhost:8080" heartbeat_interval = 30 print(f"1. 创建客户端: {client_name}") client = await manager.create_client(client_name, client_url, heartbeat_interval) print(f" ✅ 客户端创建成功: {client.name}") # 3. 连接客户端 print(f"2. 连接客户端: {client_name}") success = await manager.connect_client(client_name) if not success: print(f" ❌ 客户端连接失败") return False print(f" ✅ 客户端连接成功") # 4. 检查Channel状态 print(f"3. 检查Channel状态") client_channels = manager.get_client_channels(client_name) for channel_name, channel in client_channels.items(): print(f" Channel: {channel_name} -> 连接状态: {channel.is_connected}, 队列大小: {channel.total_queue_size}") # 5. 检查发送控制器状态 print(f"4. 检查发送控制器状态") if client_name in manager._send_controllers: send_controller = manager._send_controllers[client_name] stats = send_controller.get_stats() print(f" ✅ 发送控制器存在: {stats}") else: print(f" ❌ 发送控制器不存在") return False # 6. 发送测试消息 print(f"5. 发送测试消息") test_data = {"test": "message", "timestamp": "2024-01-01"} success = await manager.send_message(client_name, "test_message", test_data, priority=0) if not success: print(f" ❌ 消息发送失败") return False print(f" ✅ 消息发送成功") # 7. 检查Channel队列状态 print(f"6. 检查Channel队列状态") send_channel = manager.get_channel(f"{client_name}_send") if send_channel: print(f" send_channel 队列状态:") print(f" 普通队列大小: {send_channel.queue_size}") print(f" 优先级队列大小: {send_channel.priority_queue_size}") print(f" 总队列大小: {send_channel.total_queue_size}") else: print(f" ❌ send_channel 不存在") return False # 8. 等待消息被处理 print(f"7. 等待消息被处理 (3秒)") await asyncio.sleep(3) # 9. 检查发送控制器的客户端是否收到消息 print(f"8. 检查发送控制器的客户端消息") if client_name in manager._send_controllers: send_controller = manager._send_controllers[client_name] mock_client = send_controller.client if hasattr(mock_client, '_sent_messages'): print(f" 客户端发送的消息数量: {len(mock_client._sent_messages)}") for i, msg in enumerate(mock_client._sent_messages): print(f" 消息 {i+1}: {msg}") else: print(f" ❌ 客户端没有_sent_messages属性") else: print(f" ❌ 发送控制器不存在") # 10. 再次检查Channel队列状态 print(f"9. 再次检查Channel队列状态") if send_channel: print(f" send_channel 队列状态:") print(f" 普通队列大小: {send_channel.queue_size}") print(f" 优先级队列大小: {send_channel.priority_queue_size}") print(f" 总队列大小: {send_channel.total_queue_size}") # 11. 清理资源 print(f"10. 清理资源") await manager.disconnect_client(client_name) await manager.cleanup() print(f" ✅ 资源清理完成") return True except Exception as e: print(f"❌ 测试异常: {e}") import traceback traceback.print_exc() return False async def test_direct_channel_communication(): """测试直接Channel通信""" print("\n=== 测试直接Channel通信 ===") try: # 1. 创建Channel channel_name = "test_direct_channel" channel = WebSocketChannel(channel_name, max_size=100) # 2. 连接Channel await channel.connect() print(f"1. Channel连接成功: {channel.name}") # 3. 发送消息 test_message = ChannelMessage( type="test_direct", data={"direct": "test"}, priority=0 ) success = await channel.send_message(test_message) print(f"2. 消息发送: {'成功' if success else '失败'}") print(f" 队列大小: {channel.total_queue_size}") # 4. 接收消息 received_message = await channel.receive_message(timeout=1.0) if received_message: print(f"3. 消息接收成功: {received_message.type} -> {received_message.data}") else: print(f"3. 消息接收失败") # 5. 清理 await channel.disconnect() print(f"4. Channel断开连接") return True except Exception as e: print(f"❌ 直接Channel通信测试异常: {e}") import traceback traceback.print_exc() return False async def main(): """主函数""" print("开始测试消息传递流程...") # 测试直接Channel通信 success1 = await test_direct_channel_communication() # 测试完整消息流程 success2 = await test_message_flow() print(f"\n=== 测试结果 ===") print(f"直接Channel通信: {'✅ 成功' if success1 else '❌ 失败'}") print(f"完整消息流程: {'✅ 成功' if success2 else '❌ 失败'}") if success1 and success2: print("🎉 所有测试通过!") else: print("💥 存在测试失败!") if __name__ == "__main__": asyncio.run(main())