#!/usr/bin/env python3 """ WebSocket完整功能测试服务 验证所有WebSocket组件的正常工作 """ import asyncio import json import time from datetime import datetime from app.core.websocket.manager import websocket_manager from app.core.websocket.channel import ChannelMessage from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) class WebSocketCompleteTest: """WebSocket完整功能测试类""" def __init__(self): self.test_results = [] self.test_client_name = "test_complete_client" self.test_url = "ws://localhost:8080/ws" async def run_all_tests(self): """运行所有测试""" logger.info("开始WebSocket完整功能测试") try: await self.test_create_client_and_channels() await self.test_connect_client() await self.test_send_messages() await self.test_receive_message_processing() await self.test_heartbeat_functionality() await self.test_priority_control() await self.test_disconnect_client() await self.test_cleanup() self.print_test_results() except Exception as e: logger.error(f"测试过程中发生异常: {e}") self.add_test_result("整体测试", False, f"异常: {e}") self.print_test_results() async def test_create_client_and_channels(self): """测试1:创建客户端和Channel""" logger.info("测试1:创建客户端和Channel") try: client = await websocket_manager.create_client( name=self.test_client_name, url=self.test_url, heartbeat_interval=5 ) if client: self.add_test_result("创建客户端", True, "客户端创建成功") else: self.add_test_result("创建客户端", False, "客户端创建失败") return channels = websocket_manager.get_client_channels(self.test_client_name) expected_channels = [ f"{self.test_client_name}_send", f"{self.test_client_name}_heartbeat", f"{self.test_client_name}_receive" ] channel_names = list(channels.keys()) if all(name in channel_names for name in expected_channels): self.add_test_result("创建Channel", True, f"所有Channel创建成功: {channel_names}") else: self.add_test_result("创建Channel", False, f"Channel创建不完整: {channel_names}") except Exception as e: self.add_test_result("创建客户端和Channel", False, f"异常: {e}") async def test_connect_client(self): """测试2:连接客户端""" logger.info("测试2:连接客户端") try: success = await websocket_manager.connect_client(self.test_client_name) if success: self.add_test_result("连接客户端", True, "客户端连接成功") client = websocket_manager.get_client(self.test_client_name) if client and client.is_connected: self.add_test_result("客户端状态", True, "客户端已连接") else: self.add_test_result("客户端状态", False, "客户端未连接") else: self.add_test_result("连接客户端", False, "客户端连接失败") except Exception as e: self.add_test_result("连接客户端", False, f"异常: {e}") async def test_send_messages(self): """测试3:发送消息""" logger.info("测试3:发送消息") try: data_message = { "command": "test", "data": {"message": "Hello WebSocket"} } success = await websocket_manager.send_message( client_name=self.test_client_name, message_type="data", data=data_message, priority=0 ) if success: self.add_test_result("发送数据消息", True, "数据消息发送成功") else: self.add_test_result("发送数据消息", False, "数据消息发送失败") command_message = { "action": "ping", "timestamp": datetime.now().isoformat() } success = await websocket_manager.send_message( client_name=self.test_client_name, message_type="command", data=command_message, priority=1 ) if success: self.add_test_result("发送命令消息", True, "命令消息发送成功") else: self.add_test_result("发送命令消息", False, "命令消息发送失败") except Exception as e: self.add_test_result("发送消息", False, f"异常: {e}") async def test_receive_message_processing(self): """测试4:接收消息处理""" logger.info("测试4:接收消息处理") try: receive_channel = websocket_manager.get_channel(f"{self.test_client_name}_receive") if receive_channel: test_message = ChannelMessage( type="test_response", data={ "status": "success", "message": "Test response received" }, priority=0 ) success = await receive_channel.send_message(test_message) if success: self.add_test_result("接收消息处理", True, "测试消息已添加到接收Channel") await asyncio.sleep(0.5) if receive_channel.queue_size == 0: self.add_test_result("消息处理", True, "消息已被接收处理器处理") else: self.add_test_result("消息处理", False, f"消息未被处理,队列大小: {receive_channel.queue_size}") else: self.add_test_result("接收消息处理", False, "无法添加测试消息到接收Channel") else: self.add_test_result("接收消息处理", False, "接收Channel不存在") except Exception as e: self.add_test_result("接收消息处理", False, f"异常: {e}") async def test_heartbeat_functionality(self): """测试5:心跳功能""" logger.info("测试5:心跳功能") try: is_running = websocket_manager._heartbeat_manager.is_heartbeat_running(self.test_client_name) if is_running: self.add_test_result("心跳任务状态", True, "心跳任务正在运行") else: self.add_test_result("心跳任务状态", False, "心跳任务未运行") success = await websocket_manager.send_heartbeat(self.test_client_name) if success: self.add_test_result("手动发送心跳", True, "心跳发送成功") else: self.add_test_result("手动发送心跳", False, "心跳发送失败") except Exception as e: self.add_test_result("心跳功能", False, f"异常: {e}") async def test_priority_control(self): """测试6:优先级控制""" logger.info("测试6:优先级控制") try: send_channel = websocket_manager.get_channel(f"{self.test_client_name}_send") heartbeat_channel = websocket_manager.get_channel(f"{self.test_client_name}_heartbeat") if send_channel and heartbeat_channel: high_priority_message = ChannelMessage( type="high_priority", data={"priority": "high"}, priority=1 ) success = await send_channel.send_message(high_priority_message) if success: self.add_test_result("高优先级消息", True, "高优先级消息添加成功") if send_channel.priority_queue_size > 0: self.add_test_result("优先级队列", True, "消息正确进入优先级队列") else: self.add_test_result("优先级队列", False, "消息未进入优先级队列") else: self.add_test_result("高优先级消息", False, "高优先级消息添加失败") else: self.add_test_result("优先级控制", False, "发送Channel或心跳Channel不存在") except Exception as e: self.add_test_result("优先级控制", False, f"异常: {e}") async def test_disconnect_client(self): """测试7:断开连接""" logger.info("测试7:断开连接") try: success = await websocket_manager.disconnect_client(self.test_client_name) if success: self.add_test_result("断开连接", True, "客户端断开成功") client = websocket_manager.get_client(self.test_client_name) if client and not client.is_connected: self.add_test_result("断开状态", True, "客户端已断开连接") else: self.add_test_result("断开状态", False, "客户端仍处于连接状态") else: self.add_test_result("断开连接", False, "客户端断开失败") except Exception as e: self.add_test_result("断开连接", False, f"异常: {e}") async def test_cleanup(self): """测试8:清理资源""" logger.info("测试8:清理资源") try: success = await websocket_manager.remove_client(self.test_client_name) if success: self.add_test_result("移除客户端", True, "客户端移除成功") client = websocket_manager.get_client(self.test_client_name) if client is None: self.add_test_result("客户端清理", True, "客户端已完全移除") else: self.add_test_result("客户端清理", False, "客户端仍存在") else: self.add_test_result("移除客户端", False, "客户端移除失败") except Exception as e: self.add_test_result("清理资源", False, f"异常: {e}") def add_test_result(self, test_name: str, success: bool, message: str): """添加测试结果""" result = { "test_name": test_name, "success": success, "message": message, "timestamp": datetime.now().isoformat() } self.test_results.append(result) status = "✅ 通过" if success else "❌ 失败" logger.info(f"{status} {test_name}: {message}") def print_test_results(self): """打印测试结果""" print("\n" + "="*60) print("WebSocket完整功能测试结果") print("="*60) passed = 0 failed = 0 for result in self.test_results: status = "✅ 通过" if result["success"] else "❌ 失败" print(f"{status} {result['test_name']}: {result['message']}") if result["success"]: passed += 1 else: failed += 1 print("\n" + "-"*60) print(f"总计: {len(self.test_results)} 个测试") print(f"通过: {passed} 个") print(f"失败: {failed} 个") print(f"成功率: {passed/len(self.test_results)*100:.1f}%" if self.test_results else "0%") if failed == 0: print("\n🎉 所有测试通过!WebSocket架构工作正常。") else: print(f"\n⚠️ 有 {failed} 个测试失败,需要检查相关功能。") print("="*60) async def main(): """主函数""" print("WebSocket完整功能测试服务") print("="*60) test = WebSocketCompleteTest() await test.run_all_tests() if __name__ == "__main__": asyncio.run(main())