diff --git a/app/core/app/factory.py b/app/core/app/factory.py index 4873a88..10ac7d1 100644 --- a/app/core/app/factory.py +++ b/app/core/app/factory.py @@ -65,15 +65,6 @@ def create_app( """应用启动事件""" logger.info("应用启动,开始初始化服务") try: - # 初始化默认WebSocket Channels - from app.schemas.websocket import WebSocketConfig - from app.core.websocket.manager import websocket_manager - cfg = WebSocketConfig() - for ch in cfg.default_channels: - channel = await websocket_manager.create_channel(ch, cfg.max_channel_size) - await channel.connect() - logger.info("WebSocket默认Channels初始化成功") - # 启动ADB设备监控 from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService adb_service = AutoDiscoveryAdbService() diff --git a/app/core/websocket/client.py b/app/core/websocket/client.py index d943e62..7af9ad8 100644 --- a/app/core/websocket/client.py +++ b/app/core/websocket/client.py @@ -59,7 +59,11 @@ class WebSocketClient: return True self._state = WebSocketClientState.CONNECTING - logger.info(f"WebSocket客户端 {self.name} 正在连接...") + logger.info(f"WebSocket客户端 {self.name} 正在连接... URL: {self.url}") + + # 验证URL格式 + if not self.url.startswith(('ws://', 'wss://')): + raise ValueError(f"无效的WebSocket URL: {self.url},必须以ws://或wss://开头") # 建立WebSocket连接 # 根据配置决定是否跳过SSL证书验证 @@ -81,6 +85,8 @@ class WebSocketClient: from app.core.config.settings import config connection_timeout = config.websocket.connection_timeout + logger.info(f"WebSocket客户端 {self.name} 开始连接,超时时间: {connection_timeout}秒") + # 使用websockets.connect的超时参数 self._websocket = await asyncio.wait_for( websockets.connect( @@ -105,11 +111,15 @@ class WebSocketClient: except asyncio.TimeoutError: self._state = WebSocketClientState.ERROR - logger.error(f"WebSocket客户端 {self.name} 连接超时: {connection_timeout}秒") + logger.error(f"WebSocket客户端 {self.name} 连接超时: {connection_timeout}秒,URL: {self.url}") + return False + except ValueError as e: + self._state = WebSocketClientState.ERROR + logger.error(f"WebSocket客户端 {self.name} URL格式错误: {e}") return False except Exception as e: self._state = WebSocketClientState.ERROR - logger.error(f"WebSocket客户端 {self.name} 连接失败: {e}") + logger.error(f"WebSocket客户端 {self.name} 连接失败: {e},URL: {self.url}") return False async def disconnect(self): diff --git a/modify.md b/modify.md index 5971ef4..37c7264 100644 --- a/modify.md +++ b/modify.md @@ -2,6 +2,41 @@ ## 2025-08-11 +### 应用启动优化 - 移除不必要的WebSocket Channel初始化 +**问题**:`startup_event` 中初始化默认WebSocket Channels是不必要的,因为: +1. Channels是在需要时才创建的(按需创建) +2. 真正的客户端连接是在API调用 `create_and_connect_client` 时才进行的 +3. 启动时创建空的Channels没有实际意义 + +**解决方案**:移除 `startup_event` 中的WebSocket Channel初始化代码 + +**文件变更**: +- 更新 `app/core/app/factory.py` - 移除启动时的WebSocket Channel初始化 + +**修改内容**: +```python +# 移除的代码: +# 初始化默认WebSocket Channels +from app.schemas.websocket import WebSocketConfig +from app.core.websocket.manager import websocket_manager +cfg = WebSocketConfig() +for ch in cfg.default_channels: + channel = await websocket_manager.create_channel(ch, cfg.max_channel_size) + await channel.connect() +logger.info("WebSocket默认Channels初始化成功") +``` + +**优化效果**: +- ✅ 减少不必要的启动时间 +- ✅ 避免创建无用的空Channels +- ✅ 保持按需创建的设计原则 +- ✅ 真正的Channel创建和连接在API调用时进行 + +**设计原则**: +- Channels按需创建,避免预创建空Channels +- 客户端连接时自动确保所需Channels存在 +- 启动时只初始化必要的服务(如ADB设备监控) + ### WebSocket连接超时问题修复 **问题**:WebSocket客户端连接时出现"timed out during opening handshake"错误 ``` @@ -79,8 +114,20 @@ heartbeat_interval: int = 120 # 心跳间隔2分钟 ```bash # 运行WebSocket连接测试 python test_websocket_connection.py + +# 启动本地WebSocket服务器进行测试 +python test_websocket_server.py ``` +**改进内容**: +1. **URL验证增强**:添加URL格式验证,确保以ws://或wss://开头 +2. **错误处理改进**:区分不同类型的错误(超时、URL格式错误、连接失败) +3. **日志增强**:添加更详细的连接日志,包括URL和超时时间 +4. **测试工具**: + - 更新测试脚本使用更可靠的WebSocket服务器 + - 添加API端点测试 + - 创建本地WebSocket服务器用于测试 + ### WebSocket SSL证书验证冲突修复 ### WebSocket SSL证书验证冲突修复 @@ -592,4 +639,38 @@ WebSocket服务器 ↔ WebSocketClient ↔ WebSocketAdapter ↔ WebSocketChannel - 出入通道分离:`WebSocketAdapter` 支持 `outbound_channel` 与 `inbound_channel`;默认两者同名同一Channel,后续可按需拆分不同Channel。 - 适配器幂等恢复:`WebSocketManager.create_adapter` 若存在且任务未运行则自动重启,并确保 Channel 连接。 - 创建客户端后自动为默认 Channels 建立适配器并连接,保证 Channel→Adapter→Client 链路即刻可用。 - - 心跳数据格式收敛:按用户指定的 .NET 模型,仅发送 `{ "Type": "heartbeat", "Payload": { "Message": "ping" } }`,不附加任何其他字段;由 `WebSocketAdapter` 在优先级Channel写入并由发送循环优先发送。 \ No newline at end of file + - 心跳数据格式收敛:按用户指定的 .NET 模型,仅发送 `{ "Type": "heartbeat", "Payload": { "Message": "ping" } }`,不附加任何其他字段;由 `WebSocketAdapter` 在优先级Channel写入并由发送循环优先发送。 + +### WebSocket适配器心跳循环触发机制分析 + +**问题**: 创建并连接客户端后,`adapter.py` 中的 `_heartbeat_loop` 什么时候会触发? + +**分析结果**: + +1. **触发时机**: `_heartbeat_loop` 在适配器启动时自动触发 + +2. **触发流程**: + ``` + 创建客户端 → 连接客户端 → 创建适配器 → 调用 adapter.start() → 启动 _heartbeat_loop + ``` + +3. **具体代码路径**: + - `websocket.py` 第23行: 创建并连接客户端 + - 第40-42行: 为每个默认Channel创建适配器 + - `manager.py` 第133-170行: `create_adapter` 方法 + - 第160行: 调用 `await adapter.start()` + - `adapter.py` 第35-50行: `start()` 方法 + - 第47行: 启动心跳任务 `self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())` + +4. **心跳循环机制**: + - 心跳间隔: 由 `heartbeat_interval` 参数控制(默认120秒) + - 心跳消息: 发送 `{"Message": "ping"}` 格式的JSON + - 优先级处理: 心跳消息通过优先级Channel发送,确保优先处理 + - 循环条件: 当客户端连接状态为 `self.client.is_connected` 时持续运行 + +5. **停止条件**: + - 客户端断开连接 + - 适配器被停止(调用 `adapter.stop()`) + - 任务被取消(`asyncio.CancelledError`) + +**结论**: `_heartbeat_loop` 在适配器创建并启动后立即开始运行,按照指定的心跳间隔定期发送心跳消息,直到客户端断开或适配器停止。 \ No newline at end of file diff --git a/test_api_only.py b/test_api_only.py new file mode 100644 index 0000000..b36ae00 --- /dev/null +++ b/test_api_only.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +""" +专门测试API端点的脚本 +""" +import asyncio +import json +import sys +import os + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app.core.websocket.manager import websocket_manager +from app.schemas.websocket import CreateWebSocketClientRequest +from app.utils.log import get_logger + +logger = get_logger(__name__) + +async def test_api_endpoint_simulation(): + """模拟API端点的完整流程""" + print("=== API端点模拟测试 ===") + + # 模拟API请求 + test_request = { + "name": "test_1", + "url": "ws://localhost:8080/ws", + "heartbeat_interval": 120 + } + + print(f"模拟API请求: {json.dumps(test_request, indent=2)}") + + try: + # 1. 创建请求对象(模拟API验证) + request = CreateWebSocketClientRequest(**test_request) + print("✅ 请求验证通过") + + # 2. 创建客户端 + print("1. 创建WebSocket客户端...") + client = await websocket_manager.create_client(request.name, request.url) + print(f" ✅ 客户端创建成功: {client.name}") + + # 3. 连接客户端 + print("2. 连接WebSocket客户端...") + success = await websocket_manager.connect_client(request.name) + + if success: + print(f" ✅ 连接成功: {request.name}") + + # 4. 确保默认Channels存在并连接,创建适配器 + default_channels = ["default", "system", "events"] + max_channel_size = 1000 + + print("3. 创建默认Channels和适配器...") + for ch in default_channels: + channel = websocket_manager.get_channel(ch) + if not channel: + channel = await websocket_manager.create_channel(ch, max_channel_size) + await channel.connect() + await websocket_manager.create_adapter(request.name, ch, request.heartbeat_interval) + print(f" ✅ Channel {ch} 和适配器创建成功") + + # 5. 检查最终状态 + print("4. 检查最终状态...") + client = websocket_manager.get_client(request.name) + if client: + stats = client.get_stats() + print(f" 客户端状态: {stats['state']}") + print(f" 连接状态: {stats['is_connected']}") + print(f" 重连次数: {stats['reconnect_attempts']}") + + # 6. 检查适配器状态 + print("5. 检查适配器状态...") + for ch in default_channels: + adapter = websocket_manager.get_adapter(request.name, ch) + if adapter: + adapter_stats = adapter.get_stats() + print(f" 适配器 {ch}: 发送任务运行={adapter_stats['send_task_running']}") + + # 7. 模拟成功响应 + response_data = { + "name": request.name, + "url": request.url, + "status": "connected", + "heartbeat_interval": request.heartbeat_interval + } + print(f"✅ API响应: {json.dumps(response_data, indent=2)}") + + # 8. 等待一段时间观察稳定性 + print("6. 等待5秒观察连接稳定性...") + await asyncio.sleep(5) + + # 9. 再次检查状态 + client = websocket_manager.get_client(request.name) + if client: + stats = client.get_stats() + print(f" 5秒后状态: {stats['state']}") + print(f" 5秒后连接: {stats['is_connected']}") + + # 10. 断开连接 + print("7. 断开WebSocket客户端...") + await websocket_manager.disconnect_client(request.name) + print(f" ✅ 断开成功: {request.name}") + + else: + print(f" ❌ 连接失败: {request.name}") + # 模拟API错误响应 + error_response = { + "success": False, + "message": f"WebSocket客户端 {request.name} 创建或连接失败", + "error_code": "CONNECTION_FAILED" + } + print(f"❌ API错误响应: {json.dumps(error_response, indent=2)}") + + except Exception as e: + print(f" ❌ API测试异常: {e}") + logger.error(f"API测试异常: {e}") + # 模拟API异常响应 + error_response = { + "success": False, + "message": str(e), + "error_code": "INTERNAL_ERROR" + } + print(f"❌ API异常响应: {json.dumps(error_response, indent=2)}") + + # 清理 + try: + await websocket_manager.remove_client("test_1") + except: + pass + +async def main(): + """主函数""" + try: + await test_api_endpoint_simulation() + + except KeyboardInterrupt: + print("\n测试被用户中断") + except Exception as e: + print(f"测试过程中发生异常: {e}") + logger.error(f"测试异常: {e}") + finally: + # 清理资源 + print("\n清理WebSocket管理器资源...") + await websocket_manager.cleanup() + print("清理完成") + +if __name__ == "__main__": + # 运行测试 + asyncio.run(main()) \ No newline at end of file diff --git a/test_websocket_connection.py b/test_websocket_connection.py index e6d07fd..ea7ce88 100644 --- a/test_websocket_connection.py +++ b/test_websocket_connection.py @@ -21,12 +21,12 @@ async def test_websocket_connection(): """测试WebSocket连接功能""" print("=== WebSocket连接测试 ===") - # 测试配置 + # 测试配置 - 使用更可靠的测试服务器 test_cases = [ { - "name": "test_ws_echo", - "url": "ws://echo.websocket.org", - "description": "WebSocket Echo服务器测试" + "name": "test_ws_demo", + "url": "ws://demos.kaazing.com/echo", + "description": "Kaazing Echo服务器测试" }, { "name": "test_ws_local", @@ -34,9 +34,9 @@ async def test_websocket_connection(): "description": "本地WebSocket服务器测试" }, { - "name": "test_wss_echo", - "url": "wss://echo.websocket.org", - "description": "WSS Echo服务器测试" + "name": "test_ws_invalid", + "url": "http://example.com", + "description": "无效URL测试(应该失败)" } ] @@ -72,15 +72,15 @@ async def test_websocket_connection(): print(f" 已连接: {stats['is_connected']}") # 等待一段时间观察连接状态 - print("3. 等待5秒观察连接状态...") - await asyncio.sleep(5) + print("3. 等待3秒观察连接状态...") + await asyncio.sleep(3) # 再次检查状态 client = websocket_manager.get_client(request.name) if client: stats = client.get_stats() - print(f" 5秒后状态: {stats['state']}") - print(f" 5秒后已连接: {stats['is_connected']}") + print(f" 3秒后状态: {stats['state']}") + print(f" 3秒后已连接: {stats['is_connected']}") # 断开连接 print("4. 断开WebSocket客户端...") @@ -121,12 +121,85 @@ async def test_websocket_manager_stats(): except Exception as e: print(f"获取统计信息失败: {e}") +async def test_api_endpoint(): + """测试API端点""" + print("\n=== API端点测试 ===") + + # 模拟API请求 + test_request = { + "name": "test_api_client", + "url": "ws://demos.kaazing.com/echo", + "heartbeat_interval": 120 + } + + print(f"测试请求: {json.dumps(test_request, indent=2)}") + + try: + # 创建请求对象 + request = CreateWebSocketClientRequest(**test_request) + + # 创建客户端 + print("1. 创建WebSocket客户端...") + client = await websocket_manager.create_client(request.name, request.url) + print(f" 客户端创建成功: {client.name}") + + # 连接客户端 + print("2. 连接WebSocket客户端...") + success = await websocket_manager.connect_client(request.name) + + if success: + print(f" ✅ 连接成功: {request.name}") + + # 确保默认Channels存在并连接,创建适配器 + default_channels = ["default", "system", "events"] + max_channel_size = 1000 + + print("3. 创建默认Channels和适配器...") + for ch in default_channels: + channel = websocket_manager.get_channel(ch) + if not channel: + channel = await websocket_manager.create_channel(ch, max_channel_size) + await channel.connect() + await websocket_manager.create_adapter(request.name, ch, request.heartbeat_interval) + print(f" ✅ Channel {ch} 和适配器创建成功") + + print("4. 等待2秒观察连接状态...") + await asyncio.sleep(2) + + # 检查状态 + client = websocket_manager.get_client(request.name) + if client: + stats = client.get_stats() + print(f" 最终状态: {stats['state']}") + print(f" 最终连接: {stats['is_connected']}") + + # 断开连接 + print("5. 断开WebSocket客户端...") + await websocket_manager.disconnect_client(request.name) + print(f" ✅ 断开成功: {request.name}") + + else: + print(f" ❌ 连接失败: {request.name}") + + except Exception as e: + print(f" ❌ API测试异常: {e}") + logger.error(f"API测试异常: {e}") + + # 清理 + try: + await websocket_manager.remove_client("test_api_client") + except: + pass + async def main(): """主函数""" try: # 测试WebSocket连接 await test_websocket_connection() + # 测试API端点 + await test_api_endpoint() + # 测试管理器统计 await test_websocket_manager_stats() diff --git a/test_websocket_server.py b/test_websocket_server.py new file mode 100644 index 0000000..9e308e7 --- /dev/null +++ b/test_websocket_server.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +""" +简单的WebSocket服务器用于测试 +""" +import asyncio +import websockets +import json +from datetime import datetime + +async def echo_handler(websocket, path): + """简单的echo处理器""" + print(f"客户端连接: {websocket.remote_address}") + + try: + async for message in websocket: + print(f"收到消息: {message}") + + # 解析消息 + try: + data = json.loads(message) + message_type = data.get("type", "unknown") + + # 处理心跳 + if message_type == "heartbeat": + response = { + "type": "heartbeat", + "data": {"Message": "pong"}, + "timestamp": datetime.now().isoformat() + } + else: + # Echo消息 + response = { + "type": "echo", + "data": data.get("data", message), + "timestamp": datetime.now().isoformat() + } + + # 发送响应 + await websocket.send(json.dumps(response)) + print(f"发送响应: {response}") + + except json.JSONDecodeError: + # 如果不是JSON,直接echo + await websocket.send(message) + print(f"Echo: {message}") + + except websockets.exceptions.ConnectionClosed: + print(f"客户端断开: {websocket.remote_address}") + except Exception as e: + print(f"处理消息时出错: {e}") + # 不要抛出异常,避免服务器崩溃 + try: + await websocket.send(json.dumps({ + "type": "error", + "data": {"message": str(e)}, + "timestamp": datetime.now().isoformat() + })) + except: + pass + +async def main(): + """启动WebSocket服务器""" + host = "localhost" + port = 8080 + + print(f"启动WebSocket服务器: ws://{host}:{port}") + print("按 Ctrl+C 停止服务器") + + try: + # 使用更稳定的服务器配置 + server = await websockets.serve( + echo_handler, + host, + port, + ping_interval=30, # 30秒ping间隔 + ping_timeout=10, # 10秒ping超时 + close_timeout=10 # 10秒关闭超时 + ) + + print(f"WebSocket服务器已启动: ws://{host}:{port}") + await server.wait_closed() + + except KeyboardInterrupt: + print("\n服务器已停止") + except Exception as e: + print(f"服务器启动失败: {e}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file