#!/usr/bin/env python3 """ 简单测试WebSocket消息格式组装功能 """ import asyncio import sys import os # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.core.websocket.manager import WebSocketManager from app.core.device.event_types import EventType async def test_message_format_assembly(): """测试消息格式组装功能""" print("=== 简单测试WebSocket消息格式组装功能 ===") try: # 1. 创建WebSocket管理器 websocket_manager = WebSocketManager() # 2. 创建测试客户端和Channel(不连接WebSocket) await websocket_manager.create_client("test_client", "ws://localhost:8080") # 手动启动Channel(不连接WebSocket客户端) await websocket_manager._channel_manager.start_client_channels("test_client") print("1. WebSocket客户端和Channel创建启动成功") # 3. 测试发送消息到send_channel test_data = { "type": EventType.DEVICE_STATUS_UPDATE.value, "device_id": "AMFU6R1813008221", "status": "device", "device_info": { "brand": "HONOR", "model": "JLH-AN00" }, "event_type": EventType.DEVICE_CONNECTED.value } success = await websocket_manager.send_message( "test_client", "terminal", test_data, priority=1 ) print(f"2. 发送消息到send_channel: {'成功' if success else '失败'}") # 4. 验证消息格式 send_channel = websocket_manager.get_channel("test_client_send") if send_channel and send_channel.total_queue_size > 0: message = await send_channel.receive_message(timeout=1.0) if message: print(f"3. 接收到的消息类型: {message.type}") print(f" 消息数据: {message.data}") # 检查是否包含Type字段 if "Type" in message.data: print(f" ✅ 消息格式组装成功,包含Type字段: {message.data['Type']}") else: print(f" ❌ 消息格式组装失败,缺少Type字段") # 检查原始数据是否保留 if "type" in message.data and "device_id" in message.data: print(f" ✅ 原始数据保留完整") else: print(f" ❌ 原始数据丢失") else: print("3. ❌ 无法接收消息") else: print("3. ❌ send_channel中没有消息") # 5. 测试发送消息到其他Channel(不应该组装格式) heartbeat_channel = websocket_manager.get_channel("test_client_heartbeat") if heartbeat_channel: # 直接通过channel_manager发送,绕过manager的组装逻辑 success = await websocket_manager._channel_manager.send_message_to_channel( "test_client_heartbeat", "heartbeat", {"message": "ping"}, priority=1 ) print(f"4. 发送消息到heartbeat_channel: {'成功' if success else '失败'}") if heartbeat_channel.total_queue_size > 0: message = await heartbeat_channel.receive_message(timeout=1.0) if message: print(f"5. heartbeat_channel消息类型: {message.type}") print(f" 消息数据: {message.data}") # 检查是否不包含Type字段(不应该组装) if "Type" not in message.data: print(f" ✅ heartbeat_channel消息格式正确,不包含Type字段") else: print(f" ❌ heartbeat_channel消息格式错误,不应该包含Type字段") else: print("5. ❌ 无法接收heartbeat_channel消息") else: print("5. ❌ heartbeat_channel中没有消息") # 6. 清理资源 await websocket_manager.cleanup() print("6. 资源清理完成") return True except Exception as e: print(f"❌ 测试异常: {e}") import traceback traceback.print_exc() return False async def main(): """主函数""" print("开始简单测试WebSocket消息格式组装功能...") success = await test_message_format_assembly() if success: print("\n✅ 所有测试通过!") else: print("\n❌ 测试失败!") return success if __name__ == "__main__": asyncio.run(main())