You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
185 lines
7.2 KiB
185 lines
7.2 KiB
|
3 months ago
|
# WebSocket第二次连接事件推送问题修复方案
|
||
|
|
|
||
|
|
## 问题描述
|
||
|
|
|
||
|
|
WebSocket第一次连接后能正常推送数据,但第二次连接时需要等待 adb 设备接入或重启后才能推送数据。
|
||
|
|
|
||
|
|
## 问题根因分析
|
||
|
|
|
||
|
|
### 1. 事件缓冲队列被清空
|
||
|
|
在 `app/core/device/event_manager.py` 的 `_get_events_and_clients()` 方法中,第218行的 `self._event_buffer.clear()` 会在第一次推送后完全清空事件缓冲队列。
|
||
|
|
|
||
|
|
### 2. 设备监控机制是事件驱动的
|
||
|
|
系统使用 `track_devices()` 方法监控设备状态变化,只有当设备状态发生变化时才会触发事件。当执行 `adb reboot` 时:
|
||
|
|
- 设备断开:触发 `device_disconnected` 事件
|
||
|
|
- 设备重连:触发 `device_connected` 事件
|
||
|
|
- 新事件填充缓冲队列,WebSocket客户端就能获取到数据
|
||
|
|
|
||
|
|
### 3. 缺乏主动设备状态检查
|
||
|
|
当前设计是**被动监控**模式,只监听设备状态变化事件,不会主动查询当前已连接的设备。
|
||
|
|
|
||
|
|
## 修复方案
|
||
|
|
|
||
|
|
### 方案选择:在WebSocket客户端连接时主动检查设备状态
|
||
|
|
|
||
|
|
**优势:**
|
||
|
|
- 即时响应:客户端连接后立即能获取到当前设备状态
|
||
|
|
- 最小改动:只需要在连接逻辑中添加一个方法调用
|
||
|
|
- 性能友好:只在需要时检查,不会持续消耗资源
|
||
|
|
- 用户体验好:解决了第二次连接需要等待的问题
|
||
|
|
|
||
|
|
### 实现策略
|
||
|
|
|
||
|
|
1. **保持原有逻辑不变**:不修改现有的设备监控机制
|
||
|
|
2. **异步执行**:不阻塞API响应
|
||
|
|
3. **安全处理**:完整的错误处理和日志记录
|
||
|
|
4. **避免重复**:只处理自动发现的设备,避免重复处理已注册设备
|
||
|
|
|
||
|
|
## 具体实现
|
||
|
|
|
||
|
|
### 1. 修改 WebSocketManager.connect_client() 方法
|
||
|
|
|
||
|
|
在现有的异步任务中添加设备状态检查:
|
||
|
|
|
||
|
|
```python
|
||
|
|
async def connect_client(self, name: str) -> bool:
|
||
|
|
"""连接指定客户端"""
|
||
|
|
try:
|
||
|
|
# ... 现有连接逻辑保持不变 ...
|
||
|
|
|
||
|
|
# 新增:异步注册设备事件推送(不阻塞API响应)
|
||
|
|
logger.debug(f"为客户端 {name} 异步注册设备事件推送")
|
||
|
|
device_manager = await self._get_device_manager()
|
||
|
|
|
||
|
|
# 创建异步任务并添加错误处理
|
||
|
|
register_task = asyncio.create_task(self._async_register_device_events(name, device_manager))
|
||
|
|
register_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"注册设备事件推送-{name}"))
|
||
|
|
|
||
|
|
# 新增:异步推送缓冲的事件到新连接的客户端(不阻塞API响应)
|
||
|
|
logger.debug(f"为客户端 {name} 异步推送缓冲事件")
|
||
|
|
push_task = asyncio.create_task(self._async_push_buffered_events(name, device_manager))
|
||
|
|
push_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"推送缓冲事件-{name}"))
|
||
|
|
|
||
|
|
# 新增:异步检查当前设备状态(不阻塞API响应)
|
||
|
|
logger.debug(f"为客户端 {name} 异步检查当前设备状态")
|
||
|
|
check_task = asyncio.create_task(self._async_check_current_device_status(name, device_manager))
|
||
|
|
check_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"检查设备状态-{name}"))
|
||
|
|
|
||
|
|
logger.info(f"WebSocket管理器客户端连接成功: {name}")
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"WebSocket管理器连接客户端失败: {name} - {e}")
|
||
|
|
return False
|
||
|
|
```
|
||
|
|
|
||
|
|
### 2. 添加异步设备状态检查方法
|
||
|
|
|
||
|
|
```python
|
||
|
|
async def _async_check_current_device_status(self, client_name: str, device_manager):
|
||
|
|
"""异步检查当前设备状态(不阻塞API响应)"""
|
||
|
|
try:
|
||
|
|
logger.debug(f"开始为客户端 {client_name} 检查当前设备状态")
|
||
|
|
|
||
|
|
# 获取当前所有设备
|
||
|
|
all_devices = await device_manager.get_all_devices_unified()
|
||
|
|
logger.debug(f"当前系统中有 {len(all_devices)} 个设备")
|
||
|
|
|
||
|
|
# 检查是否有在线设备需要生成事件
|
||
|
|
online_devices = [d for d in all_devices if d.get("status") == "device"]
|
||
|
|
|
||
|
|
if not online_devices:
|
||
|
|
logger.debug(f"当前没有在线设备,无需生成事件")
|
||
|
|
return
|
||
|
|
|
||
|
|
# 为每个在线设备生成连接事件(如果还没有的话)
|
||
|
|
for device in online_devices:
|
||
|
|
device_id = device["device_id"]
|
||
|
|
device_source = device.get("source", "unknown")
|
||
|
|
|
||
|
|
# 只处理自动发现的设备,避免重复处理已注册设备
|
||
|
|
if device_source == "auto_discovered":
|
||
|
|
logger.debug(f"为在线设备生成连接事件: {device_id}")
|
||
|
|
await device_manager.handle_auto_discovered_device_event(
|
||
|
|
device_id=device_id,
|
||
|
|
status="device",
|
||
|
|
device_info=device.get("device_info", {})
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
logger.debug(f"跳过已注册设备: {device_id}")
|
||
|
|
|
||
|
|
logger.info(f"为客户端 {client_name} 设备状态检查完成,处理了 {len(online_devices)} 个在线设备")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"为客户端 {client_name} 检查设备状态失败: {e}")
|
||
|
|
```
|
||
|
|
|
||
|
|
## 修复效果
|
||
|
|
|
||
|
|
### 修复前
|
||
|
|
1. **第一次连接**:能正常推送数据(有历史事件)
|
||
|
|
2. **第二次连接**:需要等待 adb 设备接入或重启才能推送数据(缓冲队列被清空)
|
||
|
|
3. **adb reboot**:能激活推送(生成新事件)
|
||
|
|
|
||
|
|
### 修复后
|
||
|
|
1. **第一次连接**:正常推送历史事件 + 当前设备状态
|
||
|
|
2. **第二次连接**:正常推送历史事件 + 当前设备状态(无需等待adb重启)
|
||
|
|
3. **设备状态变化**:原有的自动监听机制继续工作
|
||
|
|
4. **向后兼容**:完全保持原有功能不变
|
||
|
|
|
||
|
|
## 安全性保证
|
||
|
|
|
||
|
|
### 1. 完全不影响原有逻辑
|
||
|
|
- ✅ 保持原有的异步任务结构
|
||
|
|
- ✅ 不改变现有的设备监控机制
|
||
|
|
- ✅ 不修改 `handle_auto_discovered_device_event` 方法
|
||
|
|
- ✅ 保持原有的错误处理机制
|
||
|
|
|
||
|
|
### 2. 保证自动监听不失效
|
||
|
|
- ✅ 原有的 `track_devices()` 监控继续工作
|
||
|
|
- ✅ 设备状态变化仍然会触发事件
|
||
|
|
- ✅ 只是额外添加了主动检查,不会替代原有机制
|
||
|
|
- ✅ 两个机制并行工作,互不干扰
|
||
|
|
|
||
|
|
### 3. 性能友好
|
||
|
|
- ✅ 异步执行,不阻塞API响应
|
||
|
|
- ✅ 只在客户端连接时执行一次
|
||
|
|
- ✅ 不会持续消耗资源
|
||
|
|
- ✅ 不影响原有的监控性能
|
||
|
|
|
||
|
|
## 测试建议
|
||
|
|
|
||
|
|
1. **第一次WebSocket连接测试**:验证能正常推送数据
|
||
|
|
2. **第二次WebSocket连接测试**:验证无需等待adb重启即可推送数据
|
||
|
|
3. **设备状态变化测试**:验证原有的自动监听机制仍然正常工作
|
||
|
|
4. **多次连接测试**:确保修复的稳定性
|
||
|
|
5. **性能测试**:确保不影响系统性能
|
||
|
|
|
||
|
|
## 文件修改清单
|
||
|
|
|
||
|
|
- `app/core/websocket/manager.py`:添加设备状态检查逻辑
|
||
|
|
- `docs/modify.md`:记录修改内容
|
||
|
|
|
||
|
|
## 风险评估
|
||
|
|
|
||
|
|
**风险等级:低**
|
||
|
|
|
||
|
|
**原因:**
|
||
|
|
1. 只添加新功能,不修改现有逻辑
|
||
|
|
2. 异步执行,不影响主流程
|
||
|
|
3. 有完整的错误处理
|
||
|
|
4. 向后兼容,不影响现有功能
|
||
|
|
|
||
|
|
## 实施计划
|
||
|
|
|
||
|
|
1. **第一步**:生成修复方案文档(已完成)
|
||
|
|
2. **第二步**:实施代码修复
|
||
|
|
3. **第三步**:测试验证
|
||
|
|
4. **第四步**:更新修改记录
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
**创建时间:** 2025-01-27
|
||
|
|
**创建人:** AI Assistant
|
||
|
|
**状态:** 待实施
|