diff --git a/app/core/config/settings.py b/app/core/config/settings.py index cbffecf..d929ee5 100644 --- a/app/core/config/settings.py +++ b/app/core/config/settings.py @@ -39,12 +39,12 @@ class WebSocketConfig(BaseSettings): ssl_verify_hostname: bool = False # 是否验证主机名(开发环境建议设为False) # 连接配置 - connection_timeout: int = 30 + connection_timeout: int = 60 # 增加连接超时时间到60秒 reconnect_attempts: int = 5 reconnect_delay: float = 1.0 # 心跳配置 - heartbeat_interval: int = 30 + heartbeat_interval: int = 120 # 默认2分钟,与日志中的描述一致 class Config: env_prefix = "WEBSOCKET_" diff --git a/app/core/websocket/adapter.py b/app/core/websocket/adapter.py index ed0f86f..f9c4dcd 100644 --- a/app/core/websocket/adapter.py +++ b/app/core/websocket/adapter.py @@ -18,7 +18,7 @@ class WebSocketAdapter: 支持出入通道分离:outbound用于发送,inbound用于接收 """ - def __init__(self, client: WebSocketClient, outbound_channel: WebSocketChannel, inbound_channel: Optional[WebSocketChannel] = None, heartbeat_interval: int = 30): + def __init__(self, client: WebSocketClient, outbound_channel: WebSocketChannel, inbound_channel: Optional[WebSocketChannel] = None, heartbeat_interval: int = 120): self.client = client self.outbound_channel = outbound_channel self.inbound_channel = inbound_channel or outbound_channel diff --git a/app/core/websocket/client.py b/app/core/websocket/client.py index 6fe42a3..d943e62 100644 --- a/app/core/websocket/client.py +++ b/app/core/websocket/client.py @@ -77,7 +77,21 @@ class WebSocketClient: if not config.websocket.ssl_verify_certificate: ssl_context.verify_mode = ssl.CERT_NONE - self._websocket = await websockets.connect(self.url, ssl=ssl_context) + # 添加连接超时配置 + from app.core.config.settings import config + connection_timeout = config.websocket.connection_timeout + + # 使用websockets.connect的超时参数 + self._websocket = await asyncio.wait_for( + websockets.connect( + self.url, + ssl=ssl_context, + ping_interval=None, # 禁用自动ping,由适配器管理心跳 + ping_timeout=None, # 禁用自动ping超时 + close_timeout=10 # 关闭超时 + ), + timeout=connection_timeout + ) self._state = WebSocketClientState.CONNECTED self._reconnect_attempts = 0 @@ -89,6 +103,10 @@ class WebSocketClient: logger.info(f"WebSocket客户端 {self.name} 连接成功") return True + except asyncio.TimeoutError: + self._state = WebSocketClientState.ERROR + logger.error(f"WebSocket客户端 {self.name} 连接超时: {connection_timeout}秒") + return False except Exception as e: self._state = WebSocketClientState.ERROR logger.error(f"WebSocket客户端 {self.name} 连接失败: {e}") diff --git a/app/core/websocket/manager.py b/app/core/websocket/manager.py index d671d8d..47c78ae 100644 --- a/app/core/websocket/manager.py +++ b/app/core/websocket/manager.py @@ -130,7 +130,7 @@ class WebSocketManager: """获取所有Channel""" return self._channels.copy() - async def create_adapter(self, client_name: str, channel_name: str, heartbeat_interval: int = 30) -> Optional[WebSocketAdapter]: + async def create_adapter(self, client_name: str, channel_name: str, heartbeat_interval: int = 120) -> Optional[WebSocketAdapter]: """创建适配器,连接客户端和Channel""" async with self._lock: client = self.get_client(client_name) diff --git a/app/schemas/websocket.py b/app/schemas/websocket.py index f1a3d29..78d0f1d 100644 --- a/app/schemas/websocket.py +++ b/app/schemas/websocket.py @@ -27,7 +27,7 @@ class CreateWebSocketClientRequest(BaseModel): """创建WebSocket客户端请求""" name: str = Field(..., min_length=1, max_length=50, description="客户端名称") url: str = Field(..., description="WebSocket服务器URL") - heartbeat_interval: int = Field(default=30, ge=5, le=300, description="心跳间隔(秒)") + heartbeat_interval: int = Field(default=120, ge=5, le=300, description="心跳间隔(秒)") @validator('url') def validate_url(cls, v): @@ -146,6 +146,6 @@ class WebSocketConfig(BaseModel): """WebSocket配置""" default_channels: List[str] = Field(default_factory=lambda: ["default", "system", "events"], description="默认Channel列表") max_channel_size: int = Field(default=1000, ge=1, le=10000, description="默认Channel最大大小") - heartbeat_interval: int = Field(default=30, ge=5, le=300, description="心跳间隔(秒)") + heartbeat_interval: int = Field(default=120, ge=5, le=300, description="心跳间隔(秒)") reconnect_attempts: int = Field(default=5, ge=1, le=20, description="重连尝试次数") reconnect_delay: float = Field(default=1.0, ge=0.1, le=10.0, description="重连延迟(秒)") \ No newline at end of file diff --git a/modify.md b/modify.md index fc7e5e8..5971ef4 100644 --- a/modify.md +++ b/modify.md @@ -2,6 +2,87 @@ ## 2025-08-11 +### WebSocket连接超时问题修复 +**问题**:WebSocket客户端连接时出现"timed out during opening handshake"错误 +``` +ERROR - WebSocket客户端 test_1 连接失败: timed out during opening handshake +``` + +**根本原因**: +1. WebSocket连接没有设置超时时间,导致长时间等待 +2. 心跳间隔配置不一致,代码中默认30秒,但日志显示2分钟 +3. 缺少专门的超时异常处理 + +**解决方案**: +1. 增加连接超时配置和异常处理 +2. 统一心跳间隔配置为120秒(2分钟) +3. 添加连接超时测试脚本 + +**文件变更**: + +1. **WebSocket客户端超时处理** (`app/core/websocket/client.py`): +```python +# 添加连接超时配置 +from app.core.config.settings import config +connection_timeout = config.websocket.connection_timeout + +# 使用websockets.connect的超时参数 +self._websocket = await asyncio.wait_for( + websockets.connect( + self.url, + ssl=ssl_context, + ping_interval=None, # 禁用自动ping,由适配器管理心跳 + ping_timeout=None, # 禁用自动ping超时 + close_timeout=10 # 关闭超时 + ), + timeout=connection_timeout +) + +# 添加超时异常处理 +except asyncio.TimeoutError: + self._state = WebSocketClientState.ERROR + logger.error(f"WebSocket客户端 {self.name} 连接超时: {connection_timeout}秒") + return False +``` + +2. **配置统一**: +- `app/core/config/settings.py`: 心跳间隔改为120秒,连接超时改为60秒 +- `app/schemas/websocket.py`: 默认心跳间隔改为120秒 +- `app/core/websocket/adapter.py`: 默认心跳间隔改为120秒 +- `app/core/websocket/manager.py`: 默认心跳间隔改为120秒 + +3. **测试脚本** (`test_websocket_connection.py`): +- 创建WebSocket连接测试脚本 +- 测试多种WebSocket服务器(ws://, wss://, 本地) +- 验证连接、状态检查、断开功能 + +**配置优化**: +```python +# WebSocket配置 +connection_timeout: int = 60 # 连接超时60秒 +heartbeat_interval: int = 120 # 心跳间隔2分钟 +``` + +**功能特性**: +- ✅ 连接超时自动处理,避免长时间等待 +- ✅ 统一的心跳间隔配置(2分钟) +- ✅ 更好的错误日志记录 +- ✅ 连接状态实时监控 +- ✅ 自动资源清理 + +**使用建议**: +- 开发环境:使用较短的超时时间(30-60秒) +- 生产环境:根据网络情况调整超时时间 +- 心跳间隔:根据服务器要求调整(通常30秒-5分钟) + +**测试验证**: +```bash +# 运行WebSocket连接测试 +python test_websocket_connection.py +``` + +### WebSocket SSL证书验证冲突修复 + ### WebSocket SSL证书验证冲突修复 **问题**:WebSocket客户端连接时出现SSL配置冲突错误 ``` diff --git a/test_websocket_connection.py b/test_websocket_connection.py new file mode 100644 index 0000000..e6d07fd --- /dev/null +++ b/test_websocket_connection.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +""" +WebSocket连接测试脚本 +用于测试WebSocket客户端的连接功能 +""" +import asyncio +import json +import sys +import os + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app.core.websocket.manager import websocket_manager +from app.schemas.websocket import CreateWebSocketClientRequest +from app.utils.log import get_logger + +logger = get_logger(__name__) + +async def test_websocket_connection(): + """测试WebSocket连接功能""" + print("=== WebSocket连接测试 ===") + + # 测试配置 + test_cases = [ + { + "name": "test_ws_echo", + "url": "ws://echo.websocket.org", + "description": "WebSocket Echo服务器测试" + }, + { + "name": "test_ws_local", + "url": "ws://localhost:8080/ws", + "description": "本地WebSocket服务器测试" + }, + { + "name": "test_wss_echo", + "url": "wss://echo.websocket.org", + "description": "WSS Echo服务器测试" + } + ] + + for test_case in test_cases: + print(f"\n--- 测试: {test_case['description']} ---") + print(f"URL: {test_case['url']}") + + try: + # 创建请求对象 + request = CreateWebSocketClientRequest( + name=test_case["name"], + url=test_case["url"], + heartbeat_interval=120 # 2分钟心跳间隔 + ) + + # 创建客户端 + print("1. 创建WebSocket客户端...") + client = await websocket_manager.create_client(request.name, request.url) + print(f" 客户端创建成功: {client.name}") + + # 连接客户端 + print("2. 连接WebSocket客户端...") + success = await websocket_manager.connect_client(request.name) + + if success: + print(f" ✅ 连接成功: {request.name}") + + # 获取客户端状态 + client = websocket_manager.get_client(request.name) + if client: + stats = client.get_stats() + print(f" 状态: {stats['state']}") + print(f" 已连接: {stats['is_connected']}") + + # 等待一段时间观察连接状态 + print("3. 等待5秒观察连接状态...") + await asyncio.sleep(5) + + # 再次检查状态 + client = websocket_manager.get_client(request.name) + if client: + stats = client.get_stats() + print(f" 5秒后状态: {stats['state']}") + print(f" 5秒后已连接: {stats['is_connected']}") + + # 断开连接 + print("4. 断开WebSocket客户端...") + await websocket_manager.disconnect_client(request.name) + print(f" ✅ 断开成功: {request.name}") + + else: + print(f" ❌ 连接失败: {request.name}") + + except Exception as e: + print(f" ❌ 测试异常: {e}") + logger.error(f"WebSocket连接测试异常: {e}") + + # 清理 + try: + await websocket_manager.remove_client(test_case["name"]) + except: + pass + + print("\n=== 测试完成 ===") + +async def test_websocket_manager_stats(): + """测试WebSocket管理器统计信息""" + print("\n=== WebSocket管理器统计信息 ===") + + try: + stats = websocket_manager.get_stats() + print(f"客户端数量: {stats['client_count']}") + print(f"Channel数量: {stats['channel_count']}") + print(f"适配器数量: {stats['adapter_count']}") + print(f"创建时间: {stats['created_at']}") + + if stats['clients']: + print("\n客户端详情:") + for name, client_stats in stats['clients'].items(): + print(f" - {name}: {client_stats['state']} (连接: {client_stats['is_connected']})") + + except Exception as e: + print(f"获取统计信息失败: {e}") + +async def main(): + """主函数""" + try: + # 测试WebSocket连接 + await test_websocket_connection() + + # 测试管理器统计 + await test_websocket_manager_stats() + + except KeyboardInterrupt: + print("\n测试被用户中断") + except Exception as e: + print(f"测试过程中发生异常: {e}") + logger.error(f"测试异常: {e}") + finally: + # 清理资源 + print("\n清理WebSocket管理器资源...") + await websocket_manager.cleanup() + print("清理完成") + +if __name__ == "__main__": + # 运行测试 + asyncio.run(main()) \ No newline at end of file