Browse Source

第二次连接的时候推送设备

hotfix/redundant-cleanup
root 3 months ago
parent
commit
11bfdd1022
  1. 5
      .vs/VSWorkspaceState.json
  2. 42
      app/core/websocket/manager.py
  3. 200
      docs/WebSocket第二次连接事件推送修复功能测试说明.md
  4. 184
      docs/WebSocket第二次连接事件推送问题修复方案.md
  5. 88
      docs/modify.md
  6. 27
      tests/run_websocket_second_connection_test.py
  7. 229
      tests/test_websocket_device_status_check.py
  8. 385
      tests/test_websocket_second_connection_fix.py

5
.vs/VSWorkspaceState.json

@ -5,9 +5,10 @@
"\\app\\api",
"\\app\\api\\v1",
"\\app\\api\\v1\\endpoints",
"\\app\\core\\device",
"\\app\\core",
"\\app\\services",
"\\app\\utils"
],
"SelectedNode": "\\app\\api\\v1\\endpoints\\devices.py",
"SelectedNode": "\\app\\services\\device_service.py",
"PreviewInSolutionExplorer": false
}

42
app/core/websocket/manager.py

@ -223,6 +223,11 @@ class WebSocketManager:
push_task = asyncio.create_task(self._async_push_buffered_events(name, device_manager))
push_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"推送缓冲事件-{name}"))
# 新增:异步检查当前设备状态(不阻塞API响应)
logger.debug(f"为客户端 {name} 异步检查当前设备状态")
check_task = asyncio.create_task(self._async_check_current_device_status(name, device_manager))
check_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"检查设备状态-{name}"))
logger.info(f"WebSocket管理器客户端连接成功: {name}")
return True
@ -385,6 +390,43 @@ class WebSocketManager:
except Exception as e:
logger.error(f"异步推送缓冲事件到客户端 {client_name} 异常: {e}")
async def _async_check_current_device_status(self, client_name: str, device_manager):
"""异步检查当前设备状态(不阻塞API响应)"""
try:
logger.debug(f"开始为客户端 {client_name} 检查当前设备状态")
# 获取当前所有设备
all_devices = await device_manager.get_all_devices_unified()
logger.debug(f"当前系统中有 {len(all_devices)} 个设备")
# 检查是否有在线设备需要生成事件
online_devices = [d for d in all_devices if d.get("status") == "device"]
if not online_devices:
logger.debug(f"当前没有在线设备,无需生成事件")
return
# 为每个在线设备生成连接事件(如果还没有的话)
for device in online_devices:
device_id = device["device_id"]
device_source = device.get("source", "unknown")
# 只处理自动发现的设备,避免重复处理已注册设备
if device_source == "auto_discovered":
logger.debug(f"为在线设备生成连接事件: {device_id}")
await device_manager.handle_auto_discovered_device_event(
device_id=device_id,
status="device",
device_info=device.get("device_info", {})
)
else:
logger.debug(f"跳过已注册设备: {device_id}")
logger.info(f"为客户端 {client_name} 设备状态检查完成,处理了 {len(online_devices)} 个在线设备")
except Exception as e:
logger.error(f"为客户端 {client_name} 检查设备状态失败: {e}")
async def _create_and_start_receive_processor(self, client_name: str):
"""创建并启动接收消息处理器"""
try:

200
docs/WebSocket第二次连接事件推送修复功能测试说明.md

@ -0,0 +1,200 @@
# WebSocket第二次连接事件推送修复功能测试说明
## 测试文件说明
### 1. 完整功能测试
**文件:** `tests/test_websocket_second_connection_fix.py`
**功能:** 完整的WebSocket第二次连接事件推送修复功能测试
**测试内容:**
- ✅ 第一次连接时的事件推送
- ✅ 第二次连接时的事件推送(无新事件)
- ✅ 设备状态检查功能
- ✅ 错误处理
- ✅ 性能测试
**运行方式:**
```bash
cd tests
python test_websocket_second_connection_fix.py
```
### 2. 快速测试脚本
**文件:** `tests/run_websocket_second_connection_test.py`
**功能:** 快速运行完整功能测试的脚本
**运行方式:**
```bash
cd tests
python run_websocket_second_connection_test.py
```
### 3. 单元测试
**文件:** `tests/test_websocket_device_status_check.py`
**功能:** 专门测试 `_async_check_current_device_status` 方法的单元测试
**测试内容:**
- ✅ 设备状态检查基本功能
- ✅ 已注册设备跳过逻辑
- ✅ 空设备列表处理
- ✅ 错误处理
**运行方式:**
```bash
cd tests
python test_websocket_device_status_check.py
```
## 测试场景说明
### 场景1:第一次连接事件推送
**目的:** 验证第一次连接时能正常推送历史事件
**测试步骤:**
1. 模拟一些设备事件(在连接之前)
2. 检查事件缓冲队列
3. 模拟WebSocket客户端连接
4. 验证连接成功和事件推送
**预期结果:**
- 连接成功
- 设备状态检查被调用
- 历史事件被推送
### 场景2:第二次连接事件推送(无新事件)
**目的:** 验证第二次连接时能主动检查设备状态并推送数据
**测试步骤:**
1. 模拟缓冲队列被清空(第一次推送后的状态)
2. 模拟第二次WebSocket客户端连接
3. 验证连接成功和主动设备状态检查
**预期结果:**
- 连接成功
- 设备状态检查被调用
- 为在线设备生成连接事件
### 场景3:设备状态检查功能
**目的:** 验证设备状态检查方法的正确性
**测试步骤:**
1. 模拟不同状态的设备列表
2. 调用设备状态检查方法
3. 验证只处理自动发现的在线设备
**预期结果:**
- 只处理自动发现的在线设备
- 跳过已注册设备
- 跳过离线设备
### 场景4:错误处理
**目的:** 验证异常情况下的错误处理
**测试步骤:**
1. 模拟设备管理器异常
2. 模拟空设备列表
3. 验证异常被正确处理
**预期结果:**
- 异常被正确处理
- 不会影响其他功能
- 有适当的日志记录
### 场景5:性能测试
**目的:** 验证大量设备情况下的性能表现
**测试步骤:**
1. 生成100个设备数据
2. 调用设备状态检查方法
3. 测量执行时间
**预期结果:**
- 执行时间合理
- 只处理在线设备
- 内存使用稳定
## 测试数据说明
### 设备数据结构
```python
{
"device_id": "device_001",
"status": "device", # "device" 表示在线,"offline" 表示离线
"source": "auto_discovered", # "auto_discovered" 或 "registered"
"device_info": {
"model": "Test Device 1",
"manufacturer": "Test Corp"
}
}
```
### 事件数据结构
```python
{
"type": "device_status_update",
"device_id": "device_001",
"status": "device",
"timestamp": "2025-01-27T10:00:00",
"device_info": {
"model": "Test Device 1",
"manufacturer": "Test Corp"
}
}
```
## Mock说明
### 使用的Mock对象
1. **WebSocket客户端管理器**:模拟客户端连接
2. **Channel管理器**:模拟Channel启动
3. **设备管理器**:模拟设备数据获取
4. **事件处理**:模拟事件生成和处理
### Mock策略
- 使用 `unittest.mock.patch` 进行方法Mock
- 使用 `AsyncMock` 处理异步方法
- 设置合理的返回值模拟真实场景
## 测试结果验证
### 成功标准
1. **功能正确性**:所有测试用例通过
2. **性能表现**:执行时间在合理范围内
3. **错误处理**:异常情况被正确处理
4. **日志记录**:有完整的日志输出
### 失败处理
1. **测试失败**:检查Mock设置和测试逻辑
2. **功能异常**:检查修复代码实现
3. **性能问题**:优化设备状态检查逻辑
## 运行建议
### 开发阶段
- 使用单元测试快速验证核心功能
- 使用完整功能测试验证整体流程
### 部署前
- 运行所有测试确保功能正常
- 检查测试日志确认无异常
### 问题排查
- 查看详细日志定位问题
- 使用Mock数据隔离问题
## 注意事项
1. **测试环境**:确保测试环境与生产环境一致
2. **Mock数据**:使用真实的设备数据结构
3. **异步处理**:正确处理异步方法的测试
4. **资源清理**:测试完成后清理资源
5. **日志级别**:使用DEBUG级别查看详细日志
---
**创建时间:** 2025-01-27
**创建人:** AI Assistant
**状态:** 已完成

184
docs/WebSocket第二次连接事件推送问题修复方案.md

@ -0,0 +1,184 @@
# WebSocket第二次连接事件推送问题修复方案
## 问题描述
WebSocket第一次连接后能正常推送数据,但第二次连接时需要等待 adb 设备接入或重启后才能推送数据。
## 问题根因分析
### 1. 事件缓冲队列被清空
`app/core/device/event_manager.py``_get_events_and_clients()` 方法中,第218行的 `self._event_buffer.clear()` 会在第一次推送后完全清空事件缓冲队列。
### 2. 设备监控机制是事件驱动的
系统使用 `track_devices()` 方法监控设备状态变化,只有当设备状态发生变化时才会触发事件。当执行 `adb reboot` 时:
- 设备断开:触发 `device_disconnected` 事件
- 设备重连:触发 `device_connected` 事件
- 新事件填充缓冲队列,WebSocket客户端就能获取到数据
### 3. 缺乏主动设备状态检查
当前设计是**被动监控**模式,只监听设备状态变化事件,不会主动查询当前已连接的设备。
## 修复方案
### 方案选择:在WebSocket客户端连接时主动检查设备状态
**优势:**
- 即时响应:客户端连接后立即能获取到当前设备状态
- 最小改动:只需要在连接逻辑中添加一个方法调用
- 性能友好:只在需要时检查,不会持续消耗资源
- 用户体验好:解决了第二次连接需要等待的问题
### 实现策略
1. **保持原有逻辑不变**:不修改现有的设备监控机制
2. **异步执行**:不阻塞API响应
3. **安全处理**:完整的错误处理和日志记录
4. **避免重复**:只处理自动发现的设备,避免重复处理已注册设备
## 具体实现
### 1. 修改 WebSocketManager.connect_client() 方法
在现有的异步任务中添加设备状态检查:
```python
async def connect_client(self, name: str) -> bool:
"""连接指定客户端"""
try:
# ... 现有连接逻辑保持不变 ...
# 新增:异步注册设备事件推送(不阻塞API响应)
logger.debug(f"为客户端 {name} 异步注册设备事件推送")
device_manager = await self._get_device_manager()
# 创建异步任务并添加错误处理
register_task = asyncio.create_task(self._async_register_device_events(name, device_manager))
register_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"注册设备事件推送-{name}"))
# 新增:异步推送缓冲的事件到新连接的客户端(不阻塞API响应)
logger.debug(f"为客户端 {name} 异步推送缓冲事件")
push_task = asyncio.create_task(self._async_push_buffered_events(name, device_manager))
push_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"推送缓冲事件-{name}"))
# 新增:异步检查当前设备状态(不阻塞API响应)
logger.debug(f"为客户端 {name} 异步检查当前设备状态")
check_task = asyncio.create_task(self._async_check_current_device_status(name, device_manager))
check_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"检查设备状态-{name}"))
logger.info(f"WebSocket管理器客户端连接成功: {name}")
return True
except Exception as e:
logger.error(f"WebSocket管理器连接客户端失败: {name} - {e}")
return False
```
### 2. 添加异步设备状态检查方法
```python
async def _async_check_current_device_status(self, client_name: str, device_manager):
"""异步检查当前设备状态(不阻塞API响应)"""
try:
logger.debug(f"开始为客户端 {client_name} 检查当前设备状态")
# 获取当前所有设备
all_devices = await device_manager.get_all_devices_unified()
logger.debug(f"当前系统中有 {len(all_devices)} 个设备")
# 检查是否有在线设备需要生成事件
online_devices = [d for d in all_devices if d.get("status") == "device"]
if not online_devices:
logger.debug(f"当前没有在线设备,无需生成事件")
return
# 为每个在线设备生成连接事件(如果还没有的话)
for device in online_devices:
device_id = device["device_id"]
device_source = device.get("source", "unknown")
# 只处理自动发现的设备,避免重复处理已注册设备
if device_source == "auto_discovered":
logger.debug(f"为在线设备生成连接事件: {device_id}")
await device_manager.handle_auto_discovered_device_event(
device_id=device_id,
status="device",
device_info=device.get("device_info", {})
)
else:
logger.debug(f"跳过已注册设备: {device_id}")
logger.info(f"为客户端 {client_name} 设备状态检查完成,处理了 {len(online_devices)} 个在线设备")
except Exception as e:
logger.error(f"为客户端 {client_name} 检查设备状态失败: {e}")
```
## 修复效果
### 修复前
1. **第一次连接**:能正常推送数据(有历史事件)
2. **第二次连接**:需要等待 adb 设备接入或重启才能推送数据(缓冲队列被清空)
3. **adb reboot**:能激活推送(生成新事件)
### 修复后
1. **第一次连接**:正常推送历史事件 + 当前设备状态
2. **第二次连接**:正常推送历史事件 + 当前设备状态(无需等待adb重启)
3. **设备状态变化**:原有的自动监听机制继续工作
4. **向后兼容**:完全保持原有功能不变
## 安全性保证
### 1. 完全不影响原有逻辑
- ✅ 保持原有的异步任务结构
- ✅ 不改变现有的设备监控机制
- ✅ 不修改 `handle_auto_discovered_device_event` 方法
- ✅ 保持原有的错误处理机制
### 2. 保证自动监听不失效
- ✅ 原有的 `track_devices()` 监控继续工作
- ✅ 设备状态变化仍然会触发事件
- ✅ 只是额外添加了主动检查,不会替代原有机制
- ✅ 两个机制并行工作,互不干扰
### 3. 性能友好
- ✅ 异步执行,不阻塞API响应
- ✅ 只在客户端连接时执行一次
- ✅ 不会持续消耗资源
- ✅ 不影响原有的监控性能
## 测试建议
1. **第一次WebSocket连接测试**:验证能正常推送数据
2. **第二次WebSocket连接测试**:验证无需等待adb重启即可推送数据
3. **设备状态变化测试**:验证原有的自动监听机制仍然正常工作
4. **多次连接测试**:确保修复的稳定性
5. **性能测试**:确保不影响系统性能
## 文件修改清单
- `app/core/websocket/manager.py`:添加设备状态检查逻辑
- `docs/modify.md`:记录修改内容
## 风险评估
**风险等级:低**
**原因:**
1. 只添加新功能,不修改现有逻辑
2. 异步执行,不影响主流程
3. 有完整的错误处理
4. 向后兼容,不影响现有功能
## 实施计划
1. **第一步**:生成修复方案文档(已完成)
2. **第二步**:实施代码修复
3. **第三步**:测试验证
4. **第四步**:更新修改记录
---
**创建时间:** 2025-01-27
**创建人:** AI Assistant
**状态:** 待实施

88
docs/modify.md

@ -1,5 +1,93 @@
# 修改记录
## 2025-01-27 修复WebSocket第二次连接事件推送问题
**问题描述:**
WebSocket第一次连接后能正常推送数据,但第二次连接时需要等待 adb 设备接入或重启后才能推送数据。
**问题根因:**
1. **事件缓冲队列被清空**:在 `app/core/device/event_manager.py``_get_events_and_clients()` 方法中,第218行的 `self._event_buffer.clear()` 会在第一次推送后完全清空事件缓冲队列
2. **设备监控机制是事件驱动的**:系统使用 `track_devices()` 方法监控设备状态变化,只有当设备状态发生变化时才会触发事件
3. **缺乏主动设备状态检查**:当前设计是**被动监控**模式,只监听设备状态变化事件,不会主动查询当前已连接的设备
**修复方案:**
在WebSocket客户端连接时主动检查设备状态,确保新连接的客户端能立即获取到当前设备状态。
**修改内容:**
1. **修改 `app/core/websocket/manager.py` 的 `connect_client()` 方法**
```python
# 新增:异步检查当前设备状态(不阻塞API响应)
logger.debug(f"为客户端 {name} 异步检查当前设备状态")
check_task = asyncio.create_task(self._async_check_current_device_status(name, device_manager))
check_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"检查设备状态-{name}"))
```
2. **添加 `_async_check_current_device_status()` 方法**
```python
async def _async_check_current_device_status(self, client_name: str, device_manager):
"""异步检查当前设备状态(不阻塞API响应)"""
try:
logger.debug(f"开始为客户端 {client_name} 检查当前设备状态")
# 获取当前所有设备
all_devices = await device_manager.get_all_devices_unified()
logger.debug(f"当前系统中有 {len(all_devices)} 个设备")
# 检查是否有在线设备需要生成事件
online_devices = [d for d in all_devices if d.get("status") == "device"]
if not online_devices:
logger.debug(f"当前没有在线设备,无需生成事件")
return
# 为每个在线设备生成连接事件(如果还没有的话)
for device in online_devices:
device_id = device["device_id"]
device_source = device.get("source", "unknown")
# 只处理自动发现的设备,避免重复处理已注册设备
if device_source == "auto_discovered":
logger.debug(f"为在线设备生成连接事件: {device_id}")
await device_manager.handle_auto_discovered_device_event(
device_id=device_id,
status="device",
device_info=device.get("device_info", {})
)
else:
logger.debug(f"跳过已注册设备: {device_id}")
logger.info(f"为客户端 {client_name} 设备状态检查完成,处理了 {len(online_devices)} 个在线设备")
except Exception as e:
logger.error(f"为客户端 {client_name} 检查设备状态失败: {e}")
```
**修复效果:**
- ✅ **第一次连接**:正常推送历史事件 + 当前设备状态
- ✅ **第二次连接**:正常推送历史事件 + 当前设备状态(无需等待adb重启)
- ✅ **设备状态变化**:原有的自动监听机制继续工作
- ✅ **向后兼容**:完全保持原有功能不变
**安全性保证:**
- ✅ 保持原有的异步任务结构
- ✅ 不改变现有的设备监控机制
- ✅ 异步执行,不阻塞API响应
- ✅ 有完整的错误处理和日志记录
- ✅ 只处理自动发现的设备,避免重复处理已注册设备
**技术说明:**
- 修复方案采用在WebSocket客户端连接时主动检查设备状态的策略
- 通过异步任务执行,不影响API响应性能
- 保持原有的 `track_devices()` 监控机制继续工作
- 两个机制并行工作,互不干扰
**测试文件:**
- `tests/test_websocket_second_connection_fix.py`:完整功能测试
- `tests/test_websocket_device_status_check.py`:单元测试
- `tests/run_websocket_second_connection_test.py`:快速测试脚本
- `docs/WebSocket第二次连接事件推送修复功能测试说明.md`:测试说明文档
## 2025-08-26 修复CommandExecutor设备获取逻辑错误并增强日志记录
**问题描述:**

27
tests/run_websocket_second_connection_test.py

@ -0,0 +1,27 @@
#!/usr/bin/env python3
"""
WebSocket第二次连接事件推送修复功能快速测试脚本
"""
import asyncio
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test_websocket_second_connection_fix import main
if __name__ == "__main__":
print("🚀 启动WebSocket第二次连接事件推送修复功能测试...")
print("=" * 60)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n⏹️ 测试被用户中断")
except Exception as e:
print(f"\n❌ 测试执行失败: {e}")
sys.exit(1)
print("=" * 60)
print("🏁 测试完成")

229
tests/test_websocket_device_status_check.py

@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""
WebSocket设备状态检查功能单元测试
专门测试_async_check_current_device_status方法
"""
import asyncio
import sys
import os
from unittest.mock import AsyncMock, MagicMock, patch
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.core.websocket.manager import WebSocketManager
from app.core.device.manager import DeviceManager
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.INFO)
async def test_device_status_check_basic():
"""测试设备状态检查基本功能"""
logger.info("=== 测试设备状态检查基本功能 ===")
websocket_manager = WebSocketManager()
device_manager = DeviceManager()
try:
# Mock设备管理器方法
with patch.object(device_manager, 'get_all_devices_unified') as mock_get_devices, \
patch.object(device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
# 设置Mock返回值
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "offline",
"source": "auto_discovered",
"device_info": {"model": "Test Device 2"}
}
]
# 调用设备状态检查方法
await websocket_manager._async_check_current_device_status(
"test_client", device_manager
)
# 验证结果
mock_get_devices.assert_called_once()
mock_handle_event.assert_called_once_with(
device_id="device_001",
status="device",
device_info={"model": "Test Device 1"}
)
logger.info("✅ 设备状态检查基本功能测试通过")
return True
except Exception as e:
logger.error(f"❌ 设备状态检查基本功能测试失败: {e}")
return False
finally:
await websocket_manager.cleanup()
await device_manager.cleanup()
async def test_device_status_check_registered_devices():
"""测试已注册设备的跳过逻辑"""
logger.info("=== 测试已注册设备的跳过逻辑 ===")
websocket_manager = WebSocketManager()
device_manager = DeviceManager()
try:
# Mock设备管理器方法
with patch.object(device_manager, 'get_all_devices_unified') as mock_get_devices, \
patch.object(device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
# 设置Mock返回值 - 包含已注册设备
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered", # 自动发现设备
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "device",
"source": "registered", # 已注册设备,应该被跳过
"device_info": {"model": "Test Device 2"}
}
]
# 调用设备状态检查方法
await websocket_manager._async_check_current_device_status(
"test_client", device_manager
)
# 验证结果 - 只处理自动发现的设备
mock_get_devices.assert_called_once()
mock_handle_event.assert_called_once_with(
device_id="device_001",
status="device",
device_info={"model": "Test Device 1"}
)
logger.info("✅ 已注册设备跳过逻辑测试通过")
return True
except Exception as e:
logger.error(f"❌ 已注册设备跳过逻辑测试失败: {e}")
return False
finally:
await websocket_manager.cleanup()
await device_manager.cleanup()
async def test_device_status_check_empty_list():
"""测试空设备列表处理"""
logger.info("=== 测试空设备列表处理 ===")
websocket_manager = WebSocketManager()
device_manager = DeviceManager()
try:
# Mock设备管理器方法
with patch.object(device_manager, 'get_all_devices_unified') as mock_get_devices, \
patch.object(device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
# 设置Mock返回值 - 空列表
mock_get_devices.return_value = []
# 调用设备状态检查方法
await websocket_manager._async_check_current_device_status(
"test_client", device_manager
)
# 验证结果
mock_get_devices.assert_called_once()
mock_handle_event.assert_not_called()
logger.info("✅ 空设备列表处理测试通过")
return True
except Exception as e:
logger.error(f"❌ 空设备列表处理测试失败: {e}")
return False
finally:
await websocket_manager.cleanup()
await device_manager.cleanup()
async def test_device_status_check_error_handling():
"""测试错误处理"""
logger.info("=== 测试错误处理 ===")
websocket_manager = WebSocketManager()
device_manager = DeviceManager()
try:
# Mock设备管理器方法 - 抛出异常
with patch.object(device_manager, 'get_all_devices_unified', side_effect=Exception("测试异常")):
# 调用设备状态检查方法 - 应该能处理异常
await websocket_manager._async_check_current_device_status(
"test_client", device_manager
)
logger.info("✅ 错误处理测试通过")
return True
except Exception as e:
logger.error(f"❌ 错误处理测试失败: {e}")
return False
finally:
await websocket_manager.cleanup()
await device_manager.cleanup()
async def main():
"""主测试函数"""
logger.info("开始WebSocket设备状态检查功能单元测试")
logger.info("=" * 50)
test_cases = [
("设备状态检查基本功能", test_device_status_check_basic),
("已注册设备跳过逻辑", test_device_status_check_registered_devices),
("空设备列表处理", test_device_status_check_empty_list),
("错误处理", test_device_status_check_error_handling),
]
passed = 0
total = len(test_cases)
for test_name, test_func in test_cases:
logger.info(f"\n🧪 运行测试: {test_name}")
try:
result = await test_func()
if result:
passed += 1
logger.info(f"{test_name} - 通过")
else:
logger.error(f"{test_name} - 失败")
except Exception as e:
logger.error(f"{test_name} - 异常: {e}")
logger.info("\n" + "=" * 50)
logger.info(f"测试结果: {passed}/{total} 通过")
logger.info(f"成功率: {(passed/total*100):.1f}%")
if passed == total:
logger.info("🎉 所有测试通过!设备状态检查功能正常")
return True
else:
logger.error("❌ 部分测试失败,请检查实现")
return False
if __name__ == "__main__":
try:
success = asyncio.run(main())
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("\n⏹️ 测试被用户中断")
sys.exit(1)
except Exception as e:
logger.error(f"\n❌ 测试执行失败: {e}")
sys.exit(1)

385
tests/test_websocket_second_connection_fix.py

@ -0,0 +1,385 @@
#!/usr/bin/env python3
"""
WebSocket第二次连接事件推送修复功能测试
验证修复后的WebSocket连接能够主动检查设备状态并推送数据
"""
import asyncio
import sys
import os
import time
from unittest.mock import AsyncMock, MagicMock, patch
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.core.websocket.manager import WebSocketManager
from app.core.device.manager import DeviceManager
from app.core.device.event_manager import EventManager
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.DEBUG)
class TestWebSocketSecondConnectionFix:
"""WebSocket第二次连接事件推送修复功能测试类"""
def __init__(self):
self.websocket_manager = None
self.device_manager = None
self.test_results = []
async def setup(self):
"""测试环境初始化"""
logger.info("初始化测试环境...")
# 创建WebSocket管理器实例
self.websocket_manager = WebSocketManager()
# 创建设备管理器实例
self.device_manager = DeviceManager()
# 设置WebSocket推送回调
self.device_manager.set_websocket_push_callback(
self.websocket_manager._websocket_push_adapter()
)
logger.info("测试环境初始化完成")
async def cleanup(self):
"""测试环境清理"""
logger.info("清理测试环境...")
if self.websocket_manager:
await self.websocket_manager.cleanup()
if self.device_manager:
await self.device_manager.cleanup()
logger.info("测试环境清理完成")
async def test_first_connection_with_events(self):
"""测试第一次连接时的事件推送"""
logger.info("=== 测试第一次连接时的事件推送 ===")
try:
# 1. 模拟一些设备事件(在连接之前)
logger.info("步骤1: 模拟设备事件")
test_events = [
("device_001", "device", {"model": "Test Device 1", "manufacturer": "Test Corp"}),
("device_002", "device", {"model": "Test Device 2", "manufacturer": "Test Corp"}),
("device_003", "offline", {"model": "Test Device 3", "manufacturer": "Test Corp"}),
]
for device_id, status, device_info in test_events:
await self.device_manager.handle_auto_discovered_device_event(
device_id, status, device_info
)
await asyncio.sleep(0.1)
# 2. 检查事件缓冲队列
buffered_events = await self.device_manager.get_buffered_events("test_client")
logger.info(f"缓冲事件数量: {len(buffered_events)}")
# 3. 模拟WebSocket客户端连接
logger.info("步骤2: 模拟WebSocket客户端连接")
client_name = "test_client_1"
# Mock WebSocket客户端连接
with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \
patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \
patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \
patch.object(self.websocket_manager, '_create_and_start_send_controller'), \
patch.object(self.websocket_manager, '_create_and_start_receive_processor'):
# 设置设备管理器Mock
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices:
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 2"}
}
]
# 执行连接
success = await self.websocket_manager.connect_client(client_name)
# 等待异步任务完成
await asyncio.sleep(1)
# 验证连接成功
assert success, "第一次连接应该成功"
logger.info("第一次连接成功")
# 验证设备状态检查被调用
mock_get_devices.assert_called()
logger.info("设备状态检查被正确调用")
self.test_results.append(("第一次连接事件推送", "PASS"))
logger.info("✅ 第一次连接事件推送测试通过")
except Exception as e:
logger.error(f"第一次连接事件推送测试失败: {e}")
self.test_results.append(("第一次连接事件推送", f"FAIL: {e}"))
raise
async def test_second_connection_without_new_events(self):
"""测试第二次连接时的事件推送(无新事件)"""
logger.info("=== 测试第二次连接时的事件推送(无新事件) ===")
try:
# 1. 模拟第一次连接后缓冲队列被清空的情况
logger.info("步骤1: 模拟缓冲队列被清空")
# 清空事件缓冲队列(模拟第一次推送后的状态)
self.device_manager._event_manager._event_buffer.clear()
logger.info("事件缓冲队列已清空")
# 2. 模拟第二次WebSocket客户端连接
logger.info("步骤2: 模拟第二次WebSocket客户端连接")
client_name = "test_client_2"
# Mock WebSocket客户端连接
with patch.object(self.websocket_manager._client_manager, 'connect_client', return_value=True), \
patch.object(self.websocket_manager._channel_manager, 'start_client_channels'), \
patch.object(self.websocket_manager, '_wait_for_channels_ready', return_value=True), \
patch.object(self.websocket_manager, '_create_and_start_send_controller'), \
patch.object(self.websocket_manager, '_create_and_start_receive_processor'):
# 设置设备管理器Mock - 返回当前在线的设备
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 2"}
}
]
# 执行连接
success = await self.websocket_manager.connect_client(client_name)
# 等待异步任务完成
await asyncio.sleep(1)
# 验证连接成功
assert success, "第二次连接应该成功"
logger.info("第二次连接成功")
# 验证设备状态检查被调用
mock_get_devices.assert_called()
logger.info("设备状态检查被正确调用")
# 验证为在线设备生成了连接事件
assert mock_handle_event.call_count >= 2, "应该为在线设备生成连接事件"
logger.info(f"{mock_handle_event.call_count} 个设备生成了连接事件")
self.test_results.append(("第二次连接事件推送", "PASS"))
logger.info("✅ 第二次连接事件推送测试通过")
except Exception as e:
logger.error(f"第二次连接事件推送测试失败: {e}")
self.test_results.append(("第二次连接事件推送", f"FAIL: {e}"))
raise
async def test_device_status_check_functionality(self):
"""测试设备状态检查功能"""
logger.info("=== 测试设备状态检查功能 ===")
try:
# 1. 测试设备状态检查方法
logger.info("步骤1: 测试设备状态检查方法")
# Mock设备管理器
with patch.object(self.device_manager, 'get_all_devices_unified') as mock_get_devices, \
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
# 设置Mock返回值
mock_get_devices.return_value = [
{
"device_id": "device_001",
"status": "device",
"source": "auto_discovered",
"device_info": {"model": "Test Device 1"}
},
{
"device_id": "device_002",
"status": "offline",
"source": "auto_discovered",
"device_info": {"model": "Test Device 2"}
},
{
"device_id": "device_003",
"status": "device",
"source": "registered", # 已注册设备,应该被跳过
"device_info": {"model": "Test Device 3"}
}
]
# 调用设备状态检查方法
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
# 验证只处理了自动发现的在线设备
mock_handle_event.assert_called_once_with(
device_id="device_001",
status="device",
device_info={"model": "Test Device 1"}
)
logger.info("设备状态检查功能正确,只处理了自动发现的在线设备")
self.test_results.append(("设备状态检查功能", "PASS"))
logger.info("✅ 设备状态检查功能测试通过")
except Exception as e:
logger.error(f"设备状态检查功能测试失败: {e}")
self.test_results.append(("设备状态检查功能", f"FAIL: {e}"))
raise
async def test_error_handling(self):
"""测试错误处理"""
logger.info("=== 测试错误处理 ===")
try:
# 1. 测试设备管理器异常
logger.info("步骤1: 测试设备管理器异常")
with patch.object(self.device_manager, 'get_all_devices_unified', side_effect=Exception("设备管理器异常")):
# 调用设备状态检查方法,应该能处理异常
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
logger.info("设备管理器异常被正确处理")
# 2. 测试空设备列表
logger.info("步骤2: 测试空设备列表")
with patch.object(self.device_manager, 'get_all_devices_unified', return_value=[]):
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
logger.info("空设备列表被正确处理")
self.test_results.append(("错误处理", "PASS"))
logger.info("✅ 错误处理测试通过")
except Exception as e:
logger.error(f"错误处理测试失败: {e}")
self.test_results.append(("错误处理", f"FAIL: {e}"))
raise
async def test_performance(self):
"""测试性能影响"""
logger.info("=== 测试性能影响 ===")
try:
# 1. 测试大量设备的情况
logger.info("步骤1: 测试大量设备的情况")
# 生成大量设备数据
large_device_list = []
for i in range(100):
large_device_list.append({
"device_id": f"device_{i:03d}",
"status": "device" if i % 2 == 0 else "offline",
"source": "auto_discovered",
"device_info": {"model": f"Test Device {i}"}
})
with patch.object(self.device_manager, 'get_all_devices_unified', return_value=large_device_list), \
patch.object(self.device_manager, 'handle_auto_discovered_device_event') as mock_handle_event:
start_time = time.time()
await self.websocket_manager._async_check_current_device_status(
"test_client", self.device_manager
)
end_time = time.time()
duration = end_time - start_time
logger.info(f"处理100个设备耗时: {duration:.3f}")
# 验证只处理了在线设备
expected_calls = len([d for d in large_device_list if d["status"] == "device"])
assert mock_handle_event.call_count == expected_calls, f"应该处理 {expected_calls} 个在线设备"
logger.info(f"正确处理了 {mock_handle_event.call_count} 个在线设备")
self.test_results.append(("性能测试", "PASS"))
logger.info("✅ 性能测试通过")
except Exception as e:
logger.error(f"性能测试失败: {e}")
self.test_results.append(("性能测试", f"FAIL: {e}"))
raise
def print_test_results(self):
"""打印测试结果"""
logger.info("=== 测试结果汇总 ===")
total_tests = len(self.test_results)
passed_tests = len([r for r in self.test_results if r[1] == "PASS"])
failed_tests = total_tests - passed_tests
logger.info(f"总测试数: {total_tests}")
logger.info(f"通过: {passed_tests}")
logger.info(f"失败: {failed_tests}")
logger.info(f"成功率: {(passed_tests/total_tests*100):.1f}%")
logger.info("\n详细结果:")
for test_name, result in self.test_results:
status_icon = "" if result == "PASS" else ""
logger.info(f"{status_icon} {test_name}: {result}")
return passed_tests == total_tests
async def main():
"""主测试函数"""
logger.info("开始WebSocket第二次连接事件推送修复功能测试")
test_suite = TestWebSocketSecondConnectionFix()
try:
# 初始化测试环境
await test_suite.setup()
# 运行测试用例
await test_suite.test_first_connection_with_events()
await test_suite.test_second_connection_without_new_events()
await test_suite.test_device_status_check_functionality()
await test_suite.test_error_handling()
await test_suite.test_performance()
# 打印测试结果
success = test_suite.print_test_results()
if success:
logger.info("🎉 所有测试通过!WebSocket第二次连接事件推送修复功能正常")
else:
logger.error("❌ 部分测试失败,请检查修复实现")
except Exception as e:
logger.error(f"测试执行异常: {e}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
finally:
# 清理测试环境
await test_suite.cleanup()
if __name__ == "__main__":
asyncio.run(main())
Loading…
Cancel
Save