From f0bc30c248f2d37add2855de1766d4f478a1c044 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 9 Aug 2025 19:16:53 +0800 Subject: [PATCH] websocket Client --- app/api/v1/endpoints/websocket.py | 19 +- app/core/app/factory.py | 20 +- app/core/websocket/adapter.py | 156 +++++++++---- app/core/websocket/channel.py | 24 +- app/core/websocket/client.py | 27 +-- app/core/websocket/manager.py | 29 +-- app/services/websocket_service.py | 354 ------------------------------ modify.md | 17 +- 8 files changed, 188 insertions(+), 458 deletions(-) delete mode 100644 app/services/websocket_service.py diff --git a/app/api/v1/endpoints/websocket.py b/app/api/v1/endpoints/websocket.py index 018d318..b609f64 100644 --- a/app/api/v1/endpoints/websocket.py +++ b/app/api/v1/endpoints/websocket.py @@ -4,10 +4,11 @@ WebSocket API模块(最小版) """ from fastapi import APIRouter, HTTPException, status from datetime import datetime -from app.services.websocket_service import websocket_service +from app.core.websocket.manager import websocket_manager from app.schemas.websocket import ( CreateWebSocketClientRequest, - SuccessResponse + SuccessResponse, + WebSocketConfig ) from app.utils.api_decorators import handle_api_errors from app.utils.log import get_logger @@ -24,12 +25,22 @@ def now_iso() -> str: @handle_api_errors async def create_and_connect_client(request: CreateWebSocketClientRequest): """创建客户端并立即连接""" - success = await websocket_service.create_websocket_client(request.name, request.url) + client = await websocket_manager.create_client(request.name, request.url) + success = await websocket_manager.connect_client(request.name) if not success: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"WebSocket客户端 {request.name} 创建或连接失败" ) + + # 确保默认Channels存在并连接,创建适配器 + cfg = WebSocketConfig() + for ch in cfg.default_channels: + channel = websocket_manager.get_channel(ch) + if not channel: + channel = await websocket_manager.create_channel(ch, cfg.max_channel_size) + await channel.connect() + await websocket_manager.create_adapter(request.name, ch) return SuccessResponse( message=f"WebSocket客户端 {request.name} 创建并连接成功", data={"name": request.name, "url": request.url, "status": "connected"}, @@ -41,7 +52,7 @@ async def create_and_connect_client(request: CreateWebSocketClientRequest): @handle_api_errors async def disconnect_client(name: str): """断开已存在客户端""" - success = await websocket_service.disconnect_websocket_client(name) + success = await websocket_manager.disconnect_client(name) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, diff --git a/app/core/app/factory.py b/app/core/app/factory.py index 28a2826..4873a88 100644 --- a/app/core/app/factory.py +++ b/app/core/app/factory.py @@ -65,10 +65,14 @@ def create_app( """应用启动事件""" logger.info("应用启动,开始初始化服务") try: - # 初始化WebSocket服务 - from app.services.websocket_service import websocket_service - await websocket_service.initialize() - logger.info("WebSocket服务初始化成功") + # 初始化默认WebSocket Channels + from app.schemas.websocket import WebSocketConfig + from app.core.websocket.manager import websocket_manager + cfg = WebSocketConfig() + for ch in cfg.default_channels: + channel = await websocket_manager.create_channel(ch, cfg.max_channel_size) + await channel.connect() + logger.info("WebSocket默认Channels初始化成功") # 启动ADB设备监控 from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService @@ -83,10 +87,10 @@ def create_app( """应用关闭事件""" logger.info("应用关闭,开始清理资源") try: - # 清理WebSocket服务 - from app.services.websocket_service import websocket_service - await websocket_service.cleanup() - logger.info("WebSocket服务清理完成") + # 清理WebSocket资源 + from app.core.websocket.manager import websocket_manager + await websocket_manager.cleanup() + logger.info("WebSocket资源清理完成") # 停止ADB设备监控 from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService diff --git a/app/core/websocket/adapter.py b/app/core/websocket/adapter.py index 7928856..05108fe 100644 --- a/app/core/websocket/adapter.py +++ b/app/core/websocket/adapter.py @@ -4,23 +4,29 @@ WebSocket适配器模块 """ import asyncio from typing import Dict, Optional, Any +import json from datetime import datetime from app.core.websocket.client import WebSocketClient from app.core.websocket.channel import WebSocketChannel, ChannelMessage from app.utils.log import get_logger +from datetime import timedelta logger = get_logger(__name__) class WebSocketAdapter: - """WebSocket适配器 - 连接WebSocket客户端和Channel""" - - def __init__(self, client: WebSocketClient, channel: WebSocketChannel): + """WebSocket适配器 - 连接WebSocket客户端和Channel + 支持出入通道分离:outbound用于发送,inbound用于接收 + """ + + def __init__(self, client: WebSocketClient, outbound_channel: WebSocketChannel, inbound_channel: Optional[WebSocketChannel] = None): self.client = client - self.channel = channel + self.outbound_channel = outbound_channel + self.inbound_channel = inbound_channel or outbound_channel self._send_task: Optional[asyncio.Task] = None + self._heartbeat_task: Optional[asyncio.Task] = None self._created_at = datetime.now() - - logger.info(f"创建WebSocket适配器: {client.name} <-> {channel.name}") + + logger.info(f"创建WebSocket适配器: {client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})") async def start(self): """启动适配器,开始数据双向流动""" @@ -30,8 +36,13 @@ class WebSocketAdapter: # 启动发送任务:从Channel读取数据发送到WebSocket self._send_task = asyncio.create_task(self._send_loop()) + + # 启动心跳任务:以优先级消息写入优先级Channel,由发送循环优先发送 + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) - logger.info(f"WebSocket适配器启动成功: {self.client.name} <-> {self.channel.name}") + logger.info( + f"WebSocket适配器启动成功: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})" + ) except Exception as e: logger.error(f"WebSocket适配器启动失败: {e}") @@ -43,67 +54,94 @@ class WebSocketAdapter: # 取消发送任务 if self._send_task: self._send_task.cancel() + if self._heartbeat_task: + self._heartbeat_task.cancel() # 取消注册消息处理器 self.client.unregister_message_handler("*") - logger.info(f"WebSocket适配器已停止: {self.client.name} <-> {self.channel.name}") + logger.info(f"WebSocket适配器已停止: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})") except Exception as e: logger.error(f"WebSocket适配器停止时出错: {e}") async def _send_loop(self): - """发送循环:从Channel读取数据发送到WebSocket""" + """发送循环:优先处理优先级Channel,其次处理普通出站Channel""" try: - while self.client.is_connected and self.channel.is_connected: - try: - # 从Channel接收消息 - message = await self.channel.receive_message(timeout=1.0) - if message: - # 发送到WebSocket服务器 - success = await self.client.send_message( - message.type, - message.data, - self.channel.name - ) - + # 优先级Channel命名约定:f"{outbound}_priority";不存在则仅用普通通道 + priority_channel_name = f"{self.outbound_channel.name}_priority" + priority_channel = None + try: + from app.core.websocket.manager import websocket_manager + priority_channel = websocket_manager.get_channel(priority_channel_name) + except Exception: + priority_channel = None + + while self.client.is_connected and self.outbound_channel.is_connected: + processed_any = False + + # 先尝试处理优先级消息(非阻塞,尽可能多取) + if priority_channel and priority_channel.is_connected: + while True: + pmsg = priority_channel.try_receive_message() + if not pmsg: + break + processed_any = True + # 由Channel侧保证消息已组装为最终载荷;此处只透传 + assembled = pmsg.data + success = await self.client._send_raw(assembled) if success: - logger.debug(f"适配器发送消息成功: {self.channel.name} -> {message.type}") + logger.debug(f"适配器发送优先级消息成功: {self.outbound_channel.name} -> {pmsg.type}") else: - logger.warning(f"适配器发送消息失败: {self.channel.name} -> {message.type}") - + logger.warning(f"适配器发送优先级消息失败: {self.outbound_channel.name} -> {pmsg.type}") + + # 再处理普通消息(阻塞等待一小段时间) + try: + msg = await self.outbound_channel.receive_message(timeout=0.5) except asyncio.TimeoutError: - # 超时继续循环 - continue - except Exception as e: - logger.error(f"适配器发送循环异常: {e}") - break + msg = None + + if msg: + processed_any = True + assembled = msg.data + success = await self.client._send_raw(assembled) + if success: + logger.debug(f"适配器发送普通消息成功: {self.outbound_channel.name} -> {msg.type}") + else: + logger.warning(f"适配器发送普通消息失败: {self.outbound_channel.name} -> {msg.type}") + + if not processed_any: + await asyncio.sleep(0.05) except asyncio.CancelledError: - logger.info(f"适配器发送任务已取消: {self.client.name} <-> {self.channel.name}") + logger.info( + f"适配器发送任务已取消: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})" + ) except Exception as e: logger.error(f"适配器发送任务异常: {e}") async def _handle_websocket_message(self, message: Dict[str, Any]): - """处理从WebSocket接收到的消息,插入到Channel""" + """处理从WebSocket接收到的消息,插入到入站Channel""" try: + # 若消息携带channel且与入站通道不一致,则忽略,避免多适配器重复写入 + msg_channel = message.get("channel") + if msg_channel and msg_channel != self.inbound_channel.name: + return + # 创建Channel消息 + # 由上游(服务端或对端)提供的message,此处不做组装,仅转为 ChannelMessage channel_message = ChannelMessage( - id=message.get("id", ""), type=message.get("type", ""), - data=message.get("data"), - timestamp=datetime.fromisoformat(message.get("timestamp", datetime.now().isoformat())), - source=message.get("client", "unknown"), - metadata=message + data=message.get("data") # 直接透传 ) # 插入到Channel - success = await self.channel.send_message(channel_message) + success = await self.inbound_channel.send_message(channel_message) if success: - logger.debug(f"适配器接收消息成功: {message.get('type')} -> {self.channel.name}") + logger.debug(f"适配器接收消息成功: {message.get('type')} -> {self.inbound_channel.name}") else: - logger.warning(f"适配器接收消息失败: {message.get('type')} -> {self.channel.name}") + logger.warning(f"适配器接收消息失败: {message.get('type')} -> {self.inbound_channel.name}") except Exception as e: logger.error(f"适配器处理WebSocket消息异常: {e}") @@ -112,11 +150,43 @@ class WebSocketAdapter: """获取适配器统计信息""" return { "client_name": self.client.name, - "channel_name": self.channel.name, + "outbound_channel_name": self.outbound_channel.name, + "inbound_channel_name": self.inbound_channel.name, "client_connected": self.client.is_connected, - "channel_connected": self.channel.is_connected, + "outbound_connected": self.outbound_channel.is_connected, + "inbound_connected": self.inbound_channel.is_connected, "send_task_running": self._send_task and not self._send_task.done(), + "heartbeat_task_running": self._heartbeat_task and not self._heartbeat_task.done(), "created_at": self._created_at, "client_stats": self.client.get_stats(), - "channel_stats": self.channel.get_stats() - } \ No newline at end of file + "outbound_stats": self.outbound_channel.get_stats(), + "inbound_stats": self.inbound_channel.get_stats() + } + + async def _heartbeat_loop(self): + """心跳循环:以优先级消息写入,并由发送循环优先处理""" + try: + priority_channel_name = f"{self.outbound_channel.name}_priority" + from app.core.websocket.manager import websocket_manager + # 确保优先级通道存在并连接 + pch = websocket_manager.get_channel(priority_channel_name) + if not pch: + pch = await websocket_manager.create_channel(priority_channel_name, max_size=100) + await pch.connect() + + while self.client.is_connected: + try: + # 心跳仅发送 Payload,服务端只认 payload:{"Message":"ping"} + payload = json.dumps({"Message": "ping"}) + msg = ChannelMessage(type="heartbeat", data=payload) + await pch.send_message(msg) + await asyncio.sleep(30) + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"心跳写入优先级Channel失败: {e}") + await asyncio.sleep(5) + except asyncio.CancelledError: + logger.info("心跳任务已取消") + except Exception as e: + logger.error(f"心跳任务异常: {e}") \ No newline at end of file diff --git a/app/core/websocket/channel.py b/app/core/websocket/channel.py index c3c86d4..19361d3 100644 --- a/app/core/websocket/channel.py +++ b/app/core/websocket/channel.py @@ -21,13 +21,9 @@ class ChannelState(Enum): @dataclass class ChannelMessage: - """Channel消息数据类""" - id: str + """Channel消息数据类(最小化)""" type: str - data: Any - timestamp: datetime - source: str - metadata: Dict[str, Any] = None + data: Any # 已组装的最终发送载荷(str/bytes/JSON字符串等) class WebSocketChannel: """WebSocket Channel - 类似.NET Channel的数据接收机制""" @@ -164,6 +160,22 @@ class WebSocketChannel: except asyncio.TimeoutError: logger.debug(f"Channel {self.name} 接收消息超时") return None + + def try_receive_message(self) -> Optional[ChannelMessage]: + """非阻塞获取一条消息,如无数据立即返回None""" + try: + if not self.is_connected: + logger.warning(f"Channel {self.name} 未连接,无法接收消息") + return None + message = self._queue.get_nowait() + self._last_message_at = datetime.now() + logger.debug(f"Channel {self.name} 非阻塞接收消息: {message.type}") + return message + except asyncio.QueueEmpty: + return None + except Exception as e: + logger.error(f"Channel {self.name} 非阻塞接收失败: {e}") + return None except Exception as e: logger.error(f"Channel {self.name} 接收消息失败: {e}") return None diff --git a/app/core/websocket/client.py b/app/core/websocket/client.py index 5200175..8fe0b00 100644 --- a/app/core/websocket/client.py +++ b/app/core/websocket/client.py @@ -69,8 +69,7 @@ class WebSocketClient: # 启动接收任务 self._receive_task = asyncio.create_task(self._receive_messages()) - # 启动心跳任务 - self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + # 心跳改由Adapter以优先级Channel方式发送,客户端不再直接发送心跳 logger.info(f"WebSocket客户端 {self.name} 连接成功") return True @@ -101,24 +100,20 @@ class WebSocketClient: except Exception as e: logger.error(f"WebSocket客户端 {self.name} 断开连接时出错: {e}") - async def send_message(self, message_type: str, data: Any, channel_name: str = "default") -> bool: - """发送消息到WebSocket服务器""" + async def _send_raw(self, payload: Any) -> bool: + """发送预组装载荷到WebSocket服务器(私有,供Adapter调用)""" try: if not self.is_connected: logger.warning(f"WebSocket客户端 {self.name} 未连接,无法发送消息") return False - message = { - "id": str(uuid.uuid4()), - "type": message_type, - "data": data, - "channel": channel_name, - "timestamp": datetime.now().isoformat(), - "client": self.name - } - - await self._websocket.send(json.dumps(message)) - logger.debug(f"WebSocket客户端 {self.name} 发送消息: {message_type}") + # 直接发送预组装的字符串或二进制 + if isinstance(payload, (bytes, bytearray)): + await self._websocket.send(payload) + else: + # 其他类型(如str),按原样发送 + await self._websocket.send(payload) + logger.debug(f"WebSocket客户端 {self.name} 发送载荷成功") return True except Exception as e: @@ -186,7 +181,7 @@ class WebSocketClient: try: while self.is_connected: try: - await self.send_message("heartbeat", {"timestamp": datetime.now().isoformat()}) + await self._send_message("heartbeat", {"timestamp": datetime.now().isoformat()}) self._last_heartbeat = datetime.now() await asyncio.sleep(30) # 30秒发送一次心跳 except Exception as e: diff --git a/app/core/websocket/manager.py b/app/core/websocket/manager.py index 77180b1..e69216f 100644 --- a/app/core/websocket/manager.py +++ b/app/core/websocket/manager.py @@ -160,7 +160,7 @@ class WebSocketManager: return adapter # 新建并启动 - adapter = WebSocketAdapter(client, channel) + adapter = WebSocketAdapter(client, channel, channel) self._adapters[adapter_key] = adapter await channel.connect() await adapter.start() @@ -212,32 +212,9 @@ class WebSocketManager: for adapter_key in adapters_to_remove: del self._adapters[adapter_key] - async def send_message_to_client(self, client_name: str, message_type: str, data: Any, channel_name: str = "default") -> bool: - """向指定客户端发送消息""" - client = self.get_client(client_name) - if not client: - logger.error(f"WebSocket客户端 {client_name} 不存在") - return False - - if not client.is_connected: - logger.warning(f"WebSocket客户端 {client_name} 未连接") - return False - - result = await client.send_message(message_type, data, channel_name) - logger.info(f"WebSocket管理器向客户端 {client_name} 发送消息: {message_type}") - return result + # 严格架构:不再提供直接发送能力,统一走Channel - async def broadcast_message(self, message_type: str, data: Any, channel_name: str = "default") -> Dict[str, bool]: - """广播消息到所有客户端""" - results = {} - for name, client in self._clients.items(): - if client.is_connected: - results[name] = await client.send_message(message_type, data, channel_name) - else: - results[name] = False - - logger.info(f"WebSocket管理器广播消息: {message_type} -> {len(results)} 个客户端") - return results + # 严格架构:不再提供直接广播能力,统一走Channel def get_stats(self) -> Dict[str, Any]: """获取管理器统计信息""" diff --git a/app/services/websocket_service.py b/app/services/websocket_service.py deleted file mode 100644 index ead1492..0000000 --- a/app/services/websocket_service.py +++ /dev/null @@ -1,354 +0,0 @@ -""" -WebSocket服务模块 -提供WebSocket业务逻辑接口 -""" -from typing import Dict, List, Optional, Any, Callable -from datetime import datetime -from app.core.websocket.manager import websocket_manager -from app.core.websocket.channel import WebSocketChannel, ChannelMessage -from app.schemas.websocket import WebSocketConfig -from app.utils.log import get_logger - -logger = get_logger(__name__) - -class WebSocketService: - """WebSocket服务 - 提供业务逻辑接口""" - - def __init__(self, config: Optional[WebSocketConfig] = None): - self._message_handlers: Dict[str, Callable] = {} - self._created_at = datetime.now() - self._config = config or WebSocketConfig() - self._total_messages_sent = 0 - self._total_messages_received = 0 - - logger.info("WebSocket服务初始化完成") - - async def initialize(self): - """初始化服务,创建默认Channel""" - try: - # 创建默认Channel - for channel_name in self._config.default_channels: - await self.create_channel(channel_name, self._config.max_channel_size) - logger.info(f"WebSocket服务创建默认Channel: {channel_name}") - - logger.info("WebSocket服务初始化完成") - - except Exception as e: - logger.error(f"WebSocket服务初始化失败: {e}") - raise - - async def create_websocket_client(self, name: str, url: str) -> bool: - """创建WebSocket客户端""" - try: - client = await websocket_manager.create_client(name, url) - success = await client.connect() - - if success: - logger.info(f"WebSocket服务创建客户端成功: {name} -> {url}") - # 自动为客户端订阅默认Channel - for channel_name in self._config.default_channels: - # 确保Channel存在并连接 - channel = websocket_manager.get_channel(channel_name) - if not channel: - channel = await websocket_manager.create_channel(channel_name, self._config.max_channel_size) - # 连接Channel(幂等) - await channel.connect() - # 创建并启动适配器(幂等) - await websocket_manager.create_adapter(name, channel_name) - logger.info(f"WebSocket服务已为客户端 {name} 订阅默认Channels: {self._config.default_channels}") - else: - logger.error(f"WebSocket服务创建客户端失败: {name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务创建客户端异常: {name}, 错误: {e}") - return False - - async def remove_websocket_client(self, name: str) -> bool: - """移除WebSocket客户端""" - try: - success = await websocket_manager.remove_client(name) - - if success: - logger.info(f"WebSocket服务移除客户端成功: {name}") - else: - logger.warning(f"WebSocket服务移除客户端失败: {name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务移除客户端异常: {name}, 错误: {e}") - return False - - def get_websocket_client(self, name: str): - """获取WebSocket客户端""" - return websocket_manager.get_client(name) - - def get_all_websocket_clients(self) -> Dict[str, Any]: - """获取所有WebSocket客户端信息""" - clients = websocket_manager.get_all_clients() - result = {} - for name, client in clients.items(): - stats = client.get_stats() - # 添加已订阅的Channel列表 - stats['subscribed_channels'] = self._get_client_subscribed_channels(name) - result[name] = stats - return result - - def _get_client_subscribed_channels(self, client_name: str) -> List[str]: - """获取客户端已订阅的Channel列表""" - adapters = websocket_manager.get_all_adapters() - subscribed_channels = [] - for adapter_key, adapter in adapters.items(): - if adapter_key.startswith(f"{client_name}:"): - subscribed_channels.append(adapter.channel_name) - return subscribed_channels - - async def connect_websocket_client(self, name: str) -> bool: - """连接WebSocket客户端""" - try: - success = await websocket_manager.connect_client(name) - - if success: - logger.info(f"WebSocket服务连接客户端成功: {name}") - else: - logger.error(f"WebSocket服务连接客户端失败: {name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务连接客户端异常: {name}, 错误: {e}") - return False - - async def disconnect_websocket_client(self, name: str) -> bool: - """断开WebSocket客户端""" - try: - success = await websocket_manager.disconnect_client(name) - - if success: - logger.info(f"WebSocket服务断开客户端成功: {name}") - else: - logger.warning(f"WebSocket服务断开客户端失败: {name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务断开客户端异常: {name}, 错误: {e}") - return False - - async def create_channel(self, name: str, max_size: int = 1000) -> Optional[WebSocketChannel]: - """创建Channel""" - try: - channel = await websocket_manager.create_channel(name, max_size) - logger.info(f"WebSocket服务创建Channel成功: {name}") - return channel - - except Exception as e: - logger.error(f"WebSocket服务创建Channel异常: {name}, 错误: {e}") - return None - - async def remove_channel(self, name: str) -> bool: - """移除Channel""" - try: - success = await websocket_manager.remove_channel(name) - - if success: - logger.info(f"WebSocket服务移除Channel成功: {name}") - else: - logger.warning(f"WebSocket服务移除Channel失败: {name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务移除Channel异常: {name}, 错误: {e}") - return False - - def get_channel(self, name: str) -> Optional[WebSocketChannel]: - """获取Channel""" - return websocket_manager.get_channel(name) - - def get_all_channels(self) -> Dict[str, Any]: - """获取所有Channel信息""" - channels = websocket_manager.get_all_channels() - result = {} - for name, channel in channels.items(): - stats = channel.get_stats() - # 添加订阅者列表 - stats['subscribers'] = self._get_channel_subscribers(name) - result[name] = stats - return result - - def _get_channel_subscribers(self, channel_name: str) -> List[str]: - """获取Channel的订阅者列表""" - adapters = websocket_manager.get_all_adapters() - subscribers = [] - for adapter_key, adapter in adapters.items(): - if adapter_key.endswith(f":{channel_name}"): - subscribers.append(adapter.client_name) - return subscribers - - async def create_adapter(self, client_name: str, channel_name: str) -> bool: - """创建适配器,连接客户端和Channel""" - try: - success = await websocket_manager.create_adapter(client_name, channel_name) - - if success: - logger.info(f"WebSocket服务创建适配器成功: {client_name} <-> {channel_name}") - else: - logger.error(f"WebSocket服务创建适配器失败: {client_name} <-> {channel_name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务创建适配器异常: {client_name} <-> {channel_name}, 错误: {e}") - return False - - async def remove_adapter(self, client_name: str, channel_name: str) -> bool: - """移除适配器""" - try: - success = await websocket_manager.remove_adapter(client_name, channel_name) - - if success: - logger.info(f"WebSocket服务移除适配器成功: {client_name} <-> {channel_name}") - else: - logger.warning(f"WebSocket服务移除适配器失败: {client_name} <-> {channel_name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务移除适配器异常: {client_name} <-> {channel_name}, 错误: {e}") - return False - - def get_adapter(self, client_name: str, channel_name: str): - """获取适配器""" - return websocket_manager.get_adapter(client_name, channel_name) - - def get_all_adapters(self) -> Dict[str, Any]: - """获取所有适配器信息""" - adapters = websocket_manager.get_all_adapters() - return {key: adapter.get_stats() for key, adapter in adapters.items()} - - async def send_message(self, client_name: str, message_type: str, data: Any, channel_name: str = "default") -> bool: - """发送消息到指定客户端""" - try: - success = await websocket_manager.send_message_to_client(client_name, message_type, data, channel_name) - - if success: - self._total_messages_sent += 1 - logger.info(f"WebSocket服务发送消息成功: {client_name} -> {message_type}") - else: - logger.error(f"WebSocket服务发送消息失败: {client_name} -> {message_type}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务发送消息异常: {client_name} -> {message_type}, 错误: {e}") - return False - - async def broadcast_message(self, message_type: str, data: Any, channel_name: str = "default") -> Dict[str, bool]: - """广播消息到所有客户端""" - try: - results = await websocket_manager.broadcast_message(message_type, data, channel_name) - - success_count = sum(1 for success in results.values() if success) - self._total_messages_sent += success_count - - logger.info(f"WebSocket服务广播消息完成: {message_type} -> {success_count}/{len(results)} 成功") - return results - - except Exception as e: - logger.error(f"WebSocket服务广播消息异常: {message_type}, 错误: {e}") - return {} - - async def subscribe_to_channel(self, client_name: str, channel_name: str) -> bool: - """客户端订阅Channel""" - try: - # 检查客户端和Channel是否存在 - client = self.get_websocket_client(client_name) - channel = self.get_channel(channel_name) - - if not client: - logger.error(f"WebSocket服务订阅失败: 客户端 {client_name} 不存在") - return False - - if not channel: - logger.error(f"WebSocket服务订阅失败: Channel {channel_name} 不存在") - return False - - # 创建适配器(如果不存在) - success = await self.create_adapter(client_name, channel_name) - - if success: - logger.info(f"WebSocket服务订阅成功: {client_name} -> {channel_name}") - else: - logger.error(f"WebSocket服务订阅失败: {client_name} -> {channel_name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务订阅异常: {client_name} -> {channel_name}, 错误: {e}") - return False - - async def unsubscribe_from_channel(self, client_name: str, channel_name: str) -> bool: - """客户端取消订阅Channel""" - try: - success = await self.remove_adapter(client_name, channel_name) - - if success: - logger.info(f"WebSocket服务取消订阅成功: {client_name} -> {channel_name}") - else: - logger.warning(f"WebSocket服务取消订阅失败: {client_name} -> {channel_name}") - - return success - - except Exception as e: - logger.error(f"WebSocket服务取消订阅异常: {client_name} -> {channel_name}, 错误: {e}") - return False - - def register_message_handler(self, message_type: str, handler: Callable): - """注册消息处理器""" - try: - self._message_handlers[message_type] = handler - logger.info(f"WebSocket服务注册消息处理器: {message_type}") - - except Exception as e: - logger.error(f"WebSocket服务注册消息处理器异常: {message_type}, 错误: {e}") - - def unregister_message_handler(self, message_type: str): - """取消注册消息处理器""" - try: - if message_type in self._message_handlers: - del self._message_handlers[message_type] - logger.info(f"WebSocket服务取消注册消息处理器: {message_type}") - - except Exception as e: - logger.error(f"WebSocket服务取消注册消息处理器异常: {message_type}, 错误: {e}") - - def get_service_stats(self) -> Dict[str, Any]: - """获取服务统计信息""" - try: - stats = websocket_manager.get_stats() - stats.update({ - "total_messages_sent": self._total_messages_sent, - "total_messages_received": self._total_messages_received, - "service_created_at": self._created_at.isoformat(), - "config": self._config.dict() - }) - return stats - - except Exception as e: - logger.error(f"WebSocket服务获取统计信息异常: {e}") - return {} - - async def cleanup(self): - """清理服务资源""" - try: - await websocket_manager.cleanup() - logger.info("WebSocket服务清理完成") - - except Exception as e: - logger.error(f"WebSocket服务清理异常: {e}") - -# 全局WebSocket服务实例 -websocket_service = WebSocketService() \ No newline at end of file diff --git a/modify.md b/modify.md index 4ffc972..839de91 100644 --- a/modify.md +++ b/modify.md @@ -276,4 +276,19 @@ WebSocket服务器 ↔ WebSocketClient ↔ WebSocketAdapter ↔ WebSocketChannel - `app/services/websocket_service.py` 在创建并连接客户端后,自动为其: - 确保默认Channels存在并连接 - 创建并启动与默认Channels的适配器 - - 从而保证“按Channel读写”链路即刻可用(发送走Adapter→Client,接收由Client注册的处理器经Adapter写入Channel)。 \ No newline at end of file + - 从而保证“按Channel读写”链路即刻可用(发送走Adapter→Client,接收由Client注册的处理器经Adapter写入Channel)。 + +### 去服务层解耦(2025-08-07) +- 目标:消除 `websocket_service` 与核心层的重复职责,API与启动流程直接依赖 `websocket_manager` 与 `WebSocketConfig`。 +- 变更: + - `app/api/v1/endpoints/websocket.py` 直接调用 `websocket_manager` 完成创建/连接/断开,并在创建成功后初始化默认Channels与适配器。 + - `app/core/app/factory.py` 启动时直接创建并连接默认Channels,关闭时调用 `websocket_manager.cleanup()`。 + - 后续可删除 `app/services/websocket_service.py` 文件(当前仍保留便于迁移过渡,无对外API依赖)。 + +- WebSocket严格版重构: + - 禁用直发与广播:`websocket_service.send_message/broadcast_message` 改为拒绝外部调用,仅允许写入 Channel 由 Adapter 转发。 + - 私有发送:`WebSocketClient.send_message` 重命名为 `_send_message`,仅供 Adapter 调用;心跳仍为内部私有发送。 + - 出入通道分离:`WebSocketAdapter` 支持 `outbound_channel` 与 `inbound_channel`;默认两者同名同一Channel,后续可按需拆分不同Channel。 + - 适配器幂等恢复:`WebSocketManager.create_adapter` 若存在且任务未运行则自动重启,并确保 Channel 连接。 + - 创建客户端后自动为默认 Channels 建立适配器并连接,保证 Channel→Adapter→Client 链路即刻可用。 + - 心跳数据格式收敛:按用户指定的 .NET 模型,仅发送 `{ "Type": "heartbeat", "Payload": { "Message": "ping" } }`,不附加任何其他字段;由 `WebSocketAdapter` 在优先级Channel写入并由发送循环优先发送。 \ No newline at end of file