You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

328 lines
23 KiB

#!/usr/bin/env python3
"""
WebSocket完整功能测试服务
验证所有WebSocket组件的正常工作
"""
import asyncio
import json
import time
from datetime import datetime
from app.core.websocket.manager import websocket_manager
from app.core.websocket.channel import ChannelMessage
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.DEBUG)
class WebSocketCompleteTest:
"""WebSocket完整功能测试类"""
def __init__(self):
self.test_results = []
self.test_client_name = "test_complete_client"
self.test_url = "ws://localhost:8080/ws"
async def run_all_tests(self):
"""运行所有测试"""
logger.info("开始WebSocket完整功能测试")
try:
await self.test_create_client_and_channels()
await self.test_connect_client()
await self.test_send_messages()
await self.test_receive_message_processing()
await self.test_heartbeat_functionality()
await self.test_priority_control()
await self.test_disconnect_client()
await self.test_cleanup()
self.print_test_results()
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
self.add_test_result("整体测试", False, f"异常: {e}")
self.print_test_results()
async def test_create_client_and_channels(self):
"""测试1:创建客户端和Channel"""
logger.info("测试1:创建客户端和Channel")
try:
client = await websocket_manager.create_client(
name=self.test_client_name,
url=self.test_url,
heartbeat_interval=5
)
if client:
self.add_test_result("创建客户端", True, "客户端创建成功")
else:
self.add_test_result("创建客户端", False, "客户端创建失败")
return
channels = websocket_manager.get_client_channels(self.test_client_name)
expected_channels = [
f"{self.test_client_name}_send",
f"{self.test_client_name}_heartbeat",
f"{self.test_client_name}_receive"
]
channel_names = list(channels.keys())
if all(name in channel_names for name in expected_channels):
self.add_test_result("创建Channel", True, f"所有Channel创建成功: {channel_names}")
else:
self.add_test_result("创建Channel", False, f"Channel创建不完整: {channel_names}")
except Exception as e:
self.add_test_result("创建客户端和Channel", False, f"异常: {e}")
async def test_connect_client(self):
"""测试2:连接客户端"""
logger.info("测试2:连接客户端")
try:
success = await websocket_manager.connect_client(self.test_client_name)
if success:
self.add_test_result("连接客户端", True, "客户端连接成功")
client = websocket_manager.get_client(self.test_client_name)
if client and client.is_connected:
self.add_test_result("客户端状态", True, "客户端已连接")
else:
self.add_test_result("客户端状态", False, "客户端未连接")
else:
self.add_test_result("连接客户端", False, "客户端连接失败")
except Exception as e:
self.add_test_result("连接客户端", False, f"异常: {e}")
async def test_send_messages(self):
"""测试3:发送消息"""
logger.info("测试3:发送消息")
try:
data_message = {
"command": "test",
"data": {"message": "Hello WebSocket"}
}
success = await websocket_manager.send_message(
client_name=self.test_client_name,
message_type="data",
data=data_message,
priority=0
)
if success:
self.add_test_result("发送数据消息", True, "数据消息发送成功")
else:
self.add_test_result("发送数据消息", False, "数据消息发送失败")
command_message = {
"action": "ping",
"timestamp": datetime.now().isoformat()
}
success = await websocket_manager.send_message(
client_name=self.test_client_name,
message_type="command",
data=command_message,
priority=1
)
if success:
self.add_test_result("发送命令消息", True, "命令消息发送成功")
else:
self.add_test_result("发送命令消息", False, "命令消息发送失败")
except Exception as e:
self.add_test_result("发送消息", False, f"异常: {e}")
async def test_receive_message_processing(self):
"""测试4:接收消息处理"""
logger.info("测试4:接收消息处理")
try:
receive_channel = websocket_manager.get_channel(f"{self.test_client_name}_receive")
if receive_channel:
test_message = ChannelMessage(
type="test_response",
data={
"status": "success",
"message": "Test response received"
},
priority=0
)
success = await receive_channel.send_message(test_message)
if success:
self.add_test_result("接收消息处理", True, "测试消息已添加到接收Channel")
await asyncio.sleep(0.5)
if receive_channel.queue_size == 0:
self.add_test_result("消息处理", True, "消息已被接收处理器处理")
else:
self.add_test_result("消息处理", False, f"消息未被处理,队列大小: {receive_channel.queue_size}")
else:
self.add_test_result("接收消息处理", False, "无法添加测试消息到接收Channel")
else:
self.add_test_result("接收消息处理", False, "接收Channel不存在")
except Exception as e:
self.add_test_result("接收消息处理", False, f"异常: {e}")
async def test_heartbeat_functionality(self):
"""测试5:心跳功能"""
logger.info("测试5:心跳功能")
try:
is_running = websocket_manager._heartbeat_manager.is_heartbeat_running(self.test_client_name)
if is_running:
self.add_test_result("心跳任务状态", True, "心跳任务正在运行")
else:
self.add_test_result("心跳任务状态", False, "心跳任务未运行")
success = await websocket_manager.send_heartbeat(self.test_client_name)
if success:
self.add_test_result("手动发送心跳", True, "心跳发送成功")
else:
self.add_test_result("手动发送心跳", False, "心跳发送失败")
except Exception as e:
self.add_test_result("心跳功能", False, f"异常: {e}")
async def test_priority_control(self):
"""测试6:优先级控制"""
logger.info("测试6:优先级控制")
try:
send_channel = websocket_manager.get_channel(f"{self.test_client_name}_send")
heartbeat_channel = websocket_manager.get_channel(f"{self.test_client_name}_heartbeat")
if send_channel and heartbeat_channel:
high_priority_message = ChannelMessage(
type="high_priority",
data={"priority": "high"},
priority=1
)
success = await send_channel.send_message(high_priority_message)
if success:
self.add_test_result("高优先级消息", True, "高优先级消息添加成功")
if send_channel.priority_queue_size > 0:
self.add_test_result("优先级队列", True, "消息正确进入优先级队列")
else:
self.add_test_result("优先级队列", False, "消息未进入优先级队列")
else:
self.add_test_result("高优先级消息", False, "高优先级消息添加失败")
else:
self.add_test_result("优先级控制", False, "发送Channel或心跳Channel不存在")
except Exception as e:
self.add_test_result("优先级控制", False, f"异常: {e}")
async def test_disconnect_client(self):
"""测试7:断开连接"""
logger.info("测试7:断开连接")
try:
success = await websocket_manager.disconnect_client(self.test_client_name)
if success:
self.add_test_result("断开连接", True, "客户端断开成功")
client = websocket_manager.get_client(self.test_client_name)
if client and not client.is_connected:
self.add_test_result("断开状态", True, "客户端已断开连接")
else:
self.add_test_result("断开状态", False, "客户端仍处于连接状态")
else:
self.add_test_result("断开连接", False, "客户端断开失败")
except Exception as e:
self.add_test_result("断开连接", False, f"异常: {e}")
async def test_cleanup(self):
"""测试8:清理资源"""
logger.info("测试8:清理资源")
try:
success = await websocket_manager.remove_client(self.test_client_name)
if success:
self.add_test_result("移除客户端", True, "客户端移除成功")
client = websocket_manager.get_client(self.test_client_name)
if client is None:
self.add_test_result("客户端清理", True, "客户端已完全移除")
else:
self.add_test_result("客户端清理", False, "客户端仍存在")
else:
self.add_test_result("移除客户端", False, "客户端移除失败")
except Exception as e:
self.add_test_result("清理资源", False, f"异常: {e}")
def add_test_result(self, test_name: str, success: bool, message: str):
"""添加测试结果"""
result = {
"test_name": test_name,
"success": success,
"message": message,
"timestamp": datetime.now().isoformat()
}
self.test_results.append(result)
status = "✅ 通过" if success else "❌ 失败"
logger.info(f"{status} {test_name}: {message}")
def print_test_results(self):
"""打印测试结果"""
print("\n" + "="*60)
print("WebSocket完整功能测试结果")
print("="*60)
passed = 0
failed = 0
for result in self.test_results:
status = "✅ 通过" if result["success"] else "❌ 失败"
print(f"{status} {result['test_name']}: {result['message']}")
if result["success"]:
passed += 1
else:
failed += 1
print("\n" + "-"*60)
print(f"总计: {len(self.test_results)} 个测试")
print(f"通过: {passed}")
print(f"失败: {failed}")
print(f"成功率: {passed/len(self.test_results)*100:.1f}%" if self.test_results else "0%")
if failed == 0:
print("\n🎉 所有测试通过!WebSocket架构工作正常。")
else:
print(f"\n⚠️ 有 {failed} 个测试失败,需要检查相关功能。")
print("="*60)
async def main():
"""主函数"""
print("WebSocket完整功能测试服务")
print("="*60)
test = WebSocketCompleteTest()
await test.run_all_tests()
if __name__ == "__main__":
asyncio.run(main())