Browse Source

websocket Client

origin/hotfix/hlk-flight
root 4 months ago
parent
commit
f0bc30c248
  1. 19
      app/api/v1/endpoints/websocket.py
  2. 20
      app/core/app/factory.py
  3. 156
      app/core/websocket/adapter.py
  4. 24
      app/core/websocket/channel.py
  5. 27
      app/core/websocket/client.py
  6. 29
      app/core/websocket/manager.py
  7. 354
      app/services/websocket_service.py
  8. 17
      modify.md

19
app/api/v1/endpoints/websocket.py

@ -4,10 +4,11 @@ WebSocket API模块(最小版)
"""
from fastapi import APIRouter, HTTPException, status
from datetime import datetime
from app.services.websocket_service import websocket_service
from app.core.websocket.manager import websocket_manager
from app.schemas.websocket import (
CreateWebSocketClientRequest,
SuccessResponse
SuccessResponse,
WebSocketConfig
)
from app.utils.api_decorators import handle_api_errors
from app.utils.log import get_logger
@ -24,12 +25,22 @@ def now_iso() -> str:
@handle_api_errors
async def create_and_connect_client(request: CreateWebSocketClientRequest):
"""创建客户端并立即连接"""
success = await websocket_service.create_websocket_client(request.name, request.url)
client = await websocket_manager.create_client(request.name, request.url)
success = await websocket_manager.connect_client(request.name)
if not success:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"WebSocket客户端 {request.name} 创建或连接失败"
)
# 确保默认Channels存在并连接,创建适配器
cfg = WebSocketConfig()
for ch in cfg.default_channels:
channel = websocket_manager.get_channel(ch)
if not channel:
channel = await websocket_manager.create_channel(ch, cfg.max_channel_size)
await channel.connect()
await websocket_manager.create_adapter(request.name, ch)
return SuccessResponse(
message=f"WebSocket客户端 {request.name} 创建并连接成功",
data={"name": request.name, "url": request.url, "status": "connected"},
@ -41,7 +52,7 @@ async def create_and_connect_client(request: CreateWebSocketClientRequest):
@handle_api_errors
async def disconnect_client(name: str):
"""断开已存在客户端"""
success = await websocket_service.disconnect_websocket_client(name)
success = await websocket_manager.disconnect_client(name)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,

20
app/core/app/factory.py

@ -65,10 +65,14 @@ def create_app(
"""应用启动事件"""
logger.info("应用启动,开始初始化服务")
try:
# 初始化WebSocket服务
from app.services.websocket_service import websocket_service
await websocket_service.initialize()
logger.info("WebSocket服务初始化成功")
# 初始化默认WebSocket Channels
from app.schemas.websocket import WebSocketConfig
from app.core.websocket.manager import websocket_manager
cfg = WebSocketConfig()
for ch in cfg.default_channels:
channel = await websocket_manager.create_channel(ch, cfg.max_channel_size)
await channel.connect()
logger.info("WebSocket默认Channels初始化成功")
# 启动ADB设备监控
from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService
@ -83,10 +87,10 @@ def create_app(
"""应用关闭事件"""
logger.info("应用关闭,开始清理资源")
try:
# 清理WebSocket服务
from app.services.websocket_service import websocket_service
await websocket_service.cleanup()
logger.info("WebSocket服务清理完成")
# 清理WebSocket资源
from app.core.websocket.manager import websocket_manager
await websocket_manager.cleanup()
logger.info("WebSocket资源清理完成")
# 停止ADB设备监控
from app.services.auto_discovery_adb_service import AutoDiscoveryAdbService

156
app/core/websocket/adapter.py

@ -4,23 +4,29 @@ WebSocket适配器模块
"""
import asyncio
from typing import Dict, Optional, Any
import json
from datetime import datetime
from app.core.websocket.client import WebSocketClient
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
from app.utils.log import get_logger
from datetime import timedelta
logger = get_logger(__name__)
class WebSocketAdapter:
"""WebSocket适配器 - 连接WebSocket客户端和Channel"""
def __init__(self, client: WebSocketClient, channel: WebSocketChannel):
"""WebSocket适配器 - 连接WebSocket客户端和Channel
支持出入通道分离outbound用于发送inbound用于接收
"""
def __init__(self, client: WebSocketClient, outbound_channel: WebSocketChannel, inbound_channel: Optional[WebSocketChannel] = None):
self.client = client
self.channel = channel
self.outbound_channel = outbound_channel
self.inbound_channel = inbound_channel or outbound_channel
self._send_task: Optional[asyncio.Task] = None
self._heartbeat_task: Optional[asyncio.Task] = None
self._created_at = datetime.now()
logger.info(f"创建WebSocket适配器: {client.name} <-> {channel.name}")
logger.info(f"创建WebSocket适配器: {client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
async def start(self):
"""启动适配器,开始数据双向流动"""
@ -30,8 +36,13 @@ class WebSocketAdapter:
# 启动发送任务:从Channel读取数据发送到WebSocket
self._send_task = asyncio.create_task(self._send_loop())
# 启动心跳任务:以优先级消息写入优先级Channel,由发送循环优先发送
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info(f"WebSocket适配器启动成功: {self.client.name} <-> {self.channel.name}")
logger.info(
f"WebSocket适配器启动成功: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
)
except Exception as e:
logger.error(f"WebSocket适配器启动失败: {e}")
@ -43,67 +54,94 @@ class WebSocketAdapter:
# 取消发送任务
if self._send_task:
self._send_task.cancel()
if self._heartbeat_task:
self._heartbeat_task.cancel()
# 取消注册消息处理器
self.client.unregister_message_handler("*")
logger.info(f"WebSocket适配器已停止: {self.client.name} <-> {self.channel.name}")
logger.info(f"WebSocket适配器已停止: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
except Exception as e:
logger.error(f"WebSocket适配器停止时出错: {e}")
async def _send_loop(self):
"""发送循环:从Channel读取数据发送到WebSocket"""
"""发送循环:优先处理优先级Channel,其次处理普通出站Channel"""
try:
while self.client.is_connected and self.channel.is_connected:
try:
# 从Channel接收消息
message = await self.channel.receive_message(timeout=1.0)
if message:
# 发送到WebSocket服务器
success = await self.client.send_message(
message.type,
message.data,
self.channel.name
)
# 优先级Channel命名约定:f"{outbound}_priority";不存在则仅用普通通道
priority_channel_name = f"{self.outbound_channel.name}_priority"
priority_channel = None
try:
from app.core.websocket.manager import websocket_manager
priority_channel = websocket_manager.get_channel(priority_channel_name)
except Exception:
priority_channel = None
while self.client.is_connected and self.outbound_channel.is_connected:
processed_any = False
# 先尝试处理优先级消息(非阻塞,尽可能多取)
if priority_channel and priority_channel.is_connected:
while True:
pmsg = priority_channel.try_receive_message()
if not pmsg:
break
processed_any = True
# 由Channel侧保证消息已组装为最终载荷;此处只透传
assembled = pmsg.data
success = await self.client._send_raw(assembled)
if success:
logger.debug(f"适配器发送消息成功: {self.channel.name} -> {message.type}")
logger.debug(f"适配器发送优先级消息成功: {self.outbound_channel.name} -> {pmsg.type}")
else:
logger.warning(f"适配器发送消息失败: {self.channel.name} -> {message.type}")
logger.warning(f"适配器发送优先级消息失败: {self.outbound_channel.name} -> {pmsg.type}")
# 再处理普通消息(阻塞等待一小段时间)
try:
msg = await self.outbound_channel.receive_message(timeout=0.5)
except asyncio.TimeoutError:
# 超时继续循环
continue
except Exception as e:
logger.error(f"适配器发送循环异常: {e}")
break
msg = None
if msg:
processed_any = True
assembled = msg.data
success = await self.client._send_raw(assembled)
if success:
logger.debug(f"适配器发送普通消息成功: {self.outbound_channel.name} -> {msg.type}")
else:
logger.warning(f"适配器发送普通消息失败: {self.outbound_channel.name} -> {msg.type}")
if not processed_any:
await asyncio.sleep(0.05)
except asyncio.CancelledError:
logger.info(f"适配器发送任务已取消: {self.client.name} <-> {self.channel.name}")
logger.info(
f"适配器发送任务已取消: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
)
except Exception as e:
logger.error(f"适配器发送任务异常: {e}")
async def _handle_websocket_message(self, message: Dict[str, Any]):
"""处理从WebSocket接收到的消息,插入到Channel"""
"""处理从WebSocket接收到的消息,插入到入站Channel"""
try:
# 若消息携带channel且与入站通道不一致,则忽略,避免多适配器重复写入
msg_channel = message.get("channel")
if msg_channel and msg_channel != self.inbound_channel.name:
return
# 创建Channel消息
# 由上游(服务端或对端)提供的message,此处不做组装,仅转为 ChannelMessage
channel_message = ChannelMessage(
id=message.get("id", ""),
type=message.get("type", ""),
data=message.get("data"),
timestamp=datetime.fromisoformat(message.get("timestamp", datetime.now().isoformat())),
source=message.get("client", "unknown"),
metadata=message
data=message.get("data") # 直接透传
)
# 插入到Channel
success = await self.channel.send_message(channel_message)
success = await self.inbound_channel.send_message(channel_message)
if success:
logger.debug(f"适配器接收消息成功: {message.get('type')} -> {self.channel.name}")
logger.debug(f"适配器接收消息成功: {message.get('type')} -> {self.inbound_channel.name}")
else:
logger.warning(f"适配器接收消息失败: {message.get('type')} -> {self.channel.name}")
logger.warning(f"适配器接收消息失败: {message.get('type')} -> {self.inbound_channel.name}")
except Exception as e:
logger.error(f"适配器处理WebSocket消息异常: {e}")
@ -112,11 +150,43 @@ class WebSocketAdapter:
"""获取适配器统计信息"""
return {
"client_name": self.client.name,
"channel_name": self.channel.name,
"outbound_channel_name": self.outbound_channel.name,
"inbound_channel_name": self.inbound_channel.name,
"client_connected": self.client.is_connected,
"channel_connected": self.channel.is_connected,
"outbound_connected": self.outbound_channel.is_connected,
"inbound_connected": self.inbound_channel.is_connected,
"send_task_running": self._send_task and not self._send_task.done(),
"heartbeat_task_running": self._heartbeat_task and not self._heartbeat_task.done(),
"created_at": self._created_at,
"client_stats": self.client.get_stats(),
"channel_stats": self.channel.get_stats()
}
"outbound_stats": self.outbound_channel.get_stats(),
"inbound_stats": self.inbound_channel.get_stats()
}
async def _heartbeat_loop(self):
"""心跳循环:以优先级消息写入,并由发送循环优先处理"""
try:
priority_channel_name = f"{self.outbound_channel.name}_priority"
from app.core.websocket.manager import websocket_manager
# 确保优先级通道存在并连接
pch = websocket_manager.get_channel(priority_channel_name)
if not pch:
pch = await websocket_manager.create_channel(priority_channel_name, max_size=100)
await pch.connect()
while self.client.is_connected:
try:
# 心跳仅发送 Payload,服务端只认 payload:{"Message":"ping"}
payload = json.dumps({"Message": "ping"})
msg = ChannelMessage(type="heartbeat", data=payload)
await pch.send_message(msg)
await asyncio.sleep(30)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"心跳写入优先级Channel失败: {e}")
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info("心跳任务已取消")
except Exception as e:
logger.error(f"心跳任务异常: {e}")

24
app/core/websocket/channel.py

@ -21,13 +21,9 @@ class ChannelState(Enum):
@dataclass
class ChannelMessage:
"""Channel消息数据类"""
id: str
"""Channel消息数据类(最小化)"""
type: str
data: Any
timestamp: datetime
source: str
metadata: Dict[str, Any] = None
data: Any # 已组装的最终发送载荷(str/bytes/JSON字符串等)
class WebSocketChannel:
"""WebSocket Channel - 类似.NET Channel的数据接收机制"""
@ -164,6 +160,22 @@ class WebSocketChannel:
except asyncio.TimeoutError:
logger.debug(f"Channel {self.name} 接收消息超时")
return None
def try_receive_message(self) -> Optional[ChannelMessage]:
"""非阻塞获取一条消息,如无数据立即返回None"""
try:
if not self.is_connected:
logger.warning(f"Channel {self.name} 未连接,无法接收消息")
return None
message = self._queue.get_nowait()
self._last_message_at = datetime.now()
logger.debug(f"Channel {self.name} 非阻塞接收消息: {message.type}")
return message
except asyncio.QueueEmpty:
return None
except Exception as e:
logger.error(f"Channel {self.name} 非阻塞接收失败: {e}")
return None
except Exception as e:
logger.error(f"Channel {self.name} 接收消息失败: {e}")
return None

27
app/core/websocket/client.py

@ -69,8 +69,7 @@ class WebSocketClient:
# 启动接收任务
self._receive_task = asyncio.create_task(self._receive_messages())
# 启动心跳任务
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
# 心跳改由Adapter以优先级Channel方式发送,客户端不再直接发送心跳
logger.info(f"WebSocket客户端 {self.name} 连接成功")
return True
@ -101,24 +100,20 @@ class WebSocketClient:
except Exception as e:
logger.error(f"WebSocket客户端 {self.name} 断开连接时出错: {e}")
async def send_message(self, message_type: str, data: Any, channel_name: str = "default") -> bool:
"""发送消息到WebSocket服务器"""
async def _send_raw(self, payload: Any) -> bool:
"""发送预组装载荷到WebSocket服务器(私有,供Adapter调用)"""
try:
if not self.is_connected:
logger.warning(f"WebSocket客户端 {self.name} 未连接,无法发送消息")
return False
message = {
"id": str(uuid.uuid4()),
"type": message_type,
"data": data,
"channel": channel_name,
"timestamp": datetime.now().isoformat(),
"client": self.name
}
await self._websocket.send(json.dumps(message))
logger.debug(f"WebSocket客户端 {self.name} 发送消息: {message_type}")
# 直接发送预组装的字符串或二进制
if isinstance(payload, (bytes, bytearray)):
await self._websocket.send(payload)
else:
# 其他类型(如str),按原样发送
await self._websocket.send(payload)
logger.debug(f"WebSocket客户端 {self.name} 发送载荷成功")
return True
except Exception as e:
@ -186,7 +181,7 @@ class WebSocketClient:
try:
while self.is_connected:
try:
await self.send_message("heartbeat", {"timestamp": datetime.now().isoformat()})
await self._send_message("heartbeat", {"timestamp": datetime.now().isoformat()})
self._last_heartbeat = datetime.now()
await asyncio.sleep(30) # 30秒发送一次心跳
except Exception as e:

29
app/core/websocket/manager.py

@ -160,7 +160,7 @@ class WebSocketManager:
return adapter
# 新建并启动
adapter = WebSocketAdapter(client, channel)
adapter = WebSocketAdapter(client, channel, channel)
self._adapters[adapter_key] = adapter
await channel.connect()
await adapter.start()
@ -212,32 +212,9 @@ class WebSocketManager:
for adapter_key in adapters_to_remove:
del self._adapters[adapter_key]
async def send_message_to_client(self, client_name: str, message_type: str, data: Any, channel_name: str = "default") -> bool:
"""向指定客户端发送消息"""
client = self.get_client(client_name)
if not client:
logger.error(f"WebSocket客户端 {client_name} 不存在")
return False
if not client.is_connected:
logger.warning(f"WebSocket客户端 {client_name} 未连接")
return False
result = await client.send_message(message_type, data, channel_name)
logger.info(f"WebSocket管理器向客户端 {client_name} 发送消息: {message_type}")
return result
# 严格架构:不再提供直接发送能力,统一走Channel
async def broadcast_message(self, message_type: str, data: Any, channel_name: str = "default") -> Dict[str, bool]:
"""广播消息到所有客户端"""
results = {}
for name, client in self._clients.items():
if client.is_connected:
results[name] = await client.send_message(message_type, data, channel_name)
else:
results[name] = False
logger.info(f"WebSocket管理器广播消息: {message_type} -> {len(results)} 个客户端")
return results
# 严格架构:不再提供直接广播能力,统一走Channel
def get_stats(self) -> Dict[str, Any]:
"""获取管理器统计信息"""

354
app/services/websocket_service.py

@ -1,354 +0,0 @@
"""
WebSocket服务模块
提供WebSocket业务逻辑接口
"""
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime
from app.core.websocket.manager import websocket_manager
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
from app.schemas.websocket import WebSocketConfig
from app.utils.log import get_logger
logger = get_logger(__name__)
class WebSocketService:
"""WebSocket服务 - 提供业务逻辑接口"""
def __init__(self, config: Optional[WebSocketConfig] = None):
self._message_handlers: Dict[str, Callable] = {}
self._created_at = datetime.now()
self._config = config or WebSocketConfig()
self._total_messages_sent = 0
self._total_messages_received = 0
logger.info("WebSocket服务初始化完成")
async def initialize(self):
"""初始化服务,创建默认Channel"""
try:
# 创建默认Channel
for channel_name in self._config.default_channels:
await self.create_channel(channel_name, self._config.max_channel_size)
logger.info(f"WebSocket服务创建默认Channel: {channel_name}")
logger.info("WebSocket服务初始化完成")
except Exception as e:
logger.error(f"WebSocket服务初始化失败: {e}")
raise
async def create_websocket_client(self, name: str, url: str) -> bool:
"""创建WebSocket客户端"""
try:
client = await websocket_manager.create_client(name, url)
success = await client.connect()
if success:
logger.info(f"WebSocket服务创建客户端成功: {name} -> {url}")
# 自动为客户端订阅默认Channel
for channel_name in self._config.default_channels:
# 确保Channel存在并连接
channel = websocket_manager.get_channel(channel_name)
if not channel:
channel = await websocket_manager.create_channel(channel_name, self._config.max_channel_size)
# 连接Channel(幂等)
await channel.connect()
# 创建并启动适配器(幂等)
await websocket_manager.create_adapter(name, channel_name)
logger.info(f"WebSocket服务已为客户端 {name} 订阅默认Channels: {self._config.default_channels}")
else:
logger.error(f"WebSocket服务创建客户端失败: {name}")
return success
except Exception as e:
logger.error(f"WebSocket服务创建客户端异常: {name}, 错误: {e}")
return False
async def remove_websocket_client(self, name: str) -> bool:
"""移除WebSocket客户端"""
try:
success = await websocket_manager.remove_client(name)
if success:
logger.info(f"WebSocket服务移除客户端成功: {name}")
else:
logger.warning(f"WebSocket服务移除客户端失败: {name}")
return success
except Exception as e:
logger.error(f"WebSocket服务移除客户端异常: {name}, 错误: {e}")
return False
def get_websocket_client(self, name: str):
"""获取WebSocket客户端"""
return websocket_manager.get_client(name)
def get_all_websocket_clients(self) -> Dict[str, Any]:
"""获取所有WebSocket客户端信息"""
clients = websocket_manager.get_all_clients()
result = {}
for name, client in clients.items():
stats = client.get_stats()
# 添加已订阅的Channel列表
stats['subscribed_channels'] = self._get_client_subscribed_channels(name)
result[name] = stats
return result
def _get_client_subscribed_channels(self, client_name: str) -> List[str]:
"""获取客户端已订阅的Channel列表"""
adapters = websocket_manager.get_all_adapters()
subscribed_channels = []
for adapter_key, adapter in adapters.items():
if adapter_key.startswith(f"{client_name}:"):
subscribed_channels.append(adapter.channel_name)
return subscribed_channels
async def connect_websocket_client(self, name: str) -> bool:
"""连接WebSocket客户端"""
try:
success = await websocket_manager.connect_client(name)
if success:
logger.info(f"WebSocket服务连接客户端成功: {name}")
else:
logger.error(f"WebSocket服务连接客户端失败: {name}")
return success
except Exception as e:
logger.error(f"WebSocket服务连接客户端异常: {name}, 错误: {e}")
return False
async def disconnect_websocket_client(self, name: str) -> bool:
"""断开WebSocket客户端"""
try:
success = await websocket_manager.disconnect_client(name)
if success:
logger.info(f"WebSocket服务断开客户端成功: {name}")
else:
logger.warning(f"WebSocket服务断开客户端失败: {name}")
return success
except Exception as e:
logger.error(f"WebSocket服务断开客户端异常: {name}, 错误: {e}")
return False
async def create_channel(self, name: str, max_size: int = 1000) -> Optional[WebSocketChannel]:
"""创建Channel"""
try:
channel = await websocket_manager.create_channel(name, max_size)
logger.info(f"WebSocket服务创建Channel成功: {name}")
return channel
except Exception as e:
logger.error(f"WebSocket服务创建Channel异常: {name}, 错误: {e}")
return None
async def remove_channel(self, name: str) -> bool:
"""移除Channel"""
try:
success = await websocket_manager.remove_channel(name)
if success:
logger.info(f"WebSocket服务移除Channel成功: {name}")
else:
logger.warning(f"WebSocket服务移除Channel失败: {name}")
return success
except Exception as e:
logger.error(f"WebSocket服务移除Channel异常: {name}, 错误: {e}")
return False
def get_channel(self, name: str) -> Optional[WebSocketChannel]:
"""获取Channel"""
return websocket_manager.get_channel(name)
def get_all_channels(self) -> Dict[str, Any]:
"""获取所有Channel信息"""
channels = websocket_manager.get_all_channels()
result = {}
for name, channel in channels.items():
stats = channel.get_stats()
# 添加订阅者列表
stats['subscribers'] = self._get_channel_subscribers(name)
result[name] = stats
return result
def _get_channel_subscribers(self, channel_name: str) -> List[str]:
"""获取Channel的订阅者列表"""
adapters = websocket_manager.get_all_adapters()
subscribers = []
for adapter_key, adapter in adapters.items():
if adapter_key.endswith(f":{channel_name}"):
subscribers.append(adapter.client_name)
return subscribers
async def create_adapter(self, client_name: str, channel_name: str) -> bool:
"""创建适配器,连接客户端和Channel"""
try:
success = await websocket_manager.create_adapter(client_name, channel_name)
if success:
logger.info(f"WebSocket服务创建适配器成功: {client_name} <-> {channel_name}")
else:
logger.error(f"WebSocket服务创建适配器失败: {client_name} <-> {channel_name}")
return success
except Exception as e:
logger.error(f"WebSocket服务创建适配器异常: {client_name} <-> {channel_name}, 错误: {e}")
return False
async def remove_adapter(self, client_name: str, channel_name: str) -> bool:
"""移除适配器"""
try:
success = await websocket_manager.remove_adapter(client_name, channel_name)
if success:
logger.info(f"WebSocket服务移除适配器成功: {client_name} <-> {channel_name}")
else:
logger.warning(f"WebSocket服务移除适配器失败: {client_name} <-> {channel_name}")
return success
except Exception as e:
logger.error(f"WebSocket服务移除适配器异常: {client_name} <-> {channel_name}, 错误: {e}")
return False
def get_adapter(self, client_name: str, channel_name: str):
"""获取适配器"""
return websocket_manager.get_adapter(client_name, channel_name)
def get_all_adapters(self) -> Dict[str, Any]:
"""获取所有适配器信息"""
adapters = websocket_manager.get_all_adapters()
return {key: adapter.get_stats() for key, adapter in adapters.items()}
async def send_message(self, client_name: str, message_type: str, data: Any, channel_name: str = "default") -> bool:
"""发送消息到指定客户端"""
try:
success = await websocket_manager.send_message_to_client(client_name, message_type, data, channel_name)
if success:
self._total_messages_sent += 1
logger.info(f"WebSocket服务发送消息成功: {client_name} -> {message_type}")
else:
logger.error(f"WebSocket服务发送消息失败: {client_name} -> {message_type}")
return success
except Exception as e:
logger.error(f"WebSocket服务发送消息异常: {client_name} -> {message_type}, 错误: {e}")
return False
async def broadcast_message(self, message_type: str, data: Any, channel_name: str = "default") -> Dict[str, bool]:
"""广播消息到所有客户端"""
try:
results = await websocket_manager.broadcast_message(message_type, data, channel_name)
success_count = sum(1 for success in results.values() if success)
self._total_messages_sent += success_count
logger.info(f"WebSocket服务广播消息完成: {message_type} -> {success_count}/{len(results)} 成功")
return results
except Exception as e:
logger.error(f"WebSocket服务广播消息异常: {message_type}, 错误: {e}")
return {}
async def subscribe_to_channel(self, client_name: str, channel_name: str) -> bool:
"""客户端订阅Channel"""
try:
# 检查客户端和Channel是否存在
client = self.get_websocket_client(client_name)
channel = self.get_channel(channel_name)
if not client:
logger.error(f"WebSocket服务订阅失败: 客户端 {client_name} 不存在")
return False
if not channel:
logger.error(f"WebSocket服务订阅失败: Channel {channel_name} 不存在")
return False
# 创建适配器(如果不存在)
success = await self.create_adapter(client_name, channel_name)
if success:
logger.info(f"WebSocket服务订阅成功: {client_name} -> {channel_name}")
else:
logger.error(f"WebSocket服务订阅失败: {client_name} -> {channel_name}")
return success
except Exception as e:
logger.error(f"WebSocket服务订阅异常: {client_name} -> {channel_name}, 错误: {e}")
return False
async def unsubscribe_from_channel(self, client_name: str, channel_name: str) -> bool:
"""客户端取消订阅Channel"""
try:
success = await self.remove_adapter(client_name, channel_name)
if success:
logger.info(f"WebSocket服务取消订阅成功: {client_name} -> {channel_name}")
else:
logger.warning(f"WebSocket服务取消订阅失败: {client_name} -> {channel_name}")
return success
except Exception as e:
logger.error(f"WebSocket服务取消订阅异常: {client_name} -> {channel_name}, 错误: {e}")
return False
def register_message_handler(self, message_type: str, handler: Callable):
"""注册消息处理器"""
try:
self._message_handlers[message_type] = handler
logger.info(f"WebSocket服务注册消息处理器: {message_type}")
except Exception as e:
logger.error(f"WebSocket服务注册消息处理器异常: {message_type}, 错误: {e}")
def unregister_message_handler(self, message_type: str):
"""取消注册消息处理器"""
try:
if message_type in self._message_handlers:
del self._message_handlers[message_type]
logger.info(f"WebSocket服务取消注册消息处理器: {message_type}")
except Exception as e:
logger.error(f"WebSocket服务取消注册消息处理器异常: {message_type}, 错误: {e}")
def get_service_stats(self) -> Dict[str, Any]:
"""获取服务统计信息"""
try:
stats = websocket_manager.get_stats()
stats.update({
"total_messages_sent": self._total_messages_sent,
"total_messages_received": self._total_messages_received,
"service_created_at": self._created_at.isoformat(),
"config": self._config.dict()
})
return stats
except Exception as e:
logger.error(f"WebSocket服务获取统计信息异常: {e}")
return {}
async def cleanup(self):
"""清理服务资源"""
try:
await websocket_manager.cleanup()
logger.info("WebSocket服务清理完成")
except Exception as e:
logger.error(f"WebSocket服务清理异常: {e}")
# 全局WebSocket服务实例
websocket_service = WebSocketService()

17
modify.md

@ -276,4 +276,19 @@ WebSocket服务器 ↔ WebSocketClient ↔ WebSocketAdapter ↔ WebSocketChannel
- `app/services/websocket_service.py` 在创建并连接客户端后,自动为其:
- 确保默认Channels存在并连接
- 创建并启动与默认Channels的适配器
- 从而保证“按Channel读写”链路即刻可用(发送走Adapter→Client,接收由Client注册的处理器经Adapter写入Channel)。
- 从而保证“按Channel读写”链路即刻可用(发送走Adapter→Client,接收由Client注册的处理器经Adapter写入Channel)。
### 去服务层解耦(2025-08-07)
- 目标:消除 `websocket_service` 与核心层的重复职责,API与启动流程直接依赖 `websocket_manager``WebSocketConfig`
- 变更:
- `app/api/v1/endpoints/websocket.py` 直接调用 `websocket_manager` 完成创建/连接/断开,并在创建成功后初始化默认Channels与适配器。
- `app/core/app/factory.py` 启动时直接创建并连接默认Channels,关闭时调用 `websocket_manager.cleanup()`
- 后续可删除 `app/services/websocket_service.py` 文件(当前仍保留便于迁移过渡,无对外API依赖)。
- WebSocket严格版重构:
- 禁用直发与广播:`websocket_service.send_message/broadcast_message` 改为拒绝外部调用,仅允许写入 Channel 由 Adapter 转发。
- 私有发送:`WebSocketClient.send_message` 重命名为 `_send_message`,仅供 Adapter 调用;心跳仍为内部私有发送。
- 出入通道分离:`WebSocketAdapter` 支持 `outbound_channel``inbound_channel`;默认两者同名同一Channel,后续可按需拆分不同Channel。
- 适配器幂等恢复:`WebSocketManager.create_adapter` 若存在且任务未运行则自动重启,并确保 Channel 连接。
- 创建客户端后自动为默认 Channels 建立适配器并连接,保证 Channel→Adapter→Client 链路即刻可用。
- 心跳数据格式收敛:按用户指定的 .NET 模型,仅发送 `{ "Type": "heartbeat", "Payload": { "Message": "ping" } }`,不附加任何其他字段;由 `WebSocketAdapter` 在优先级Channel写入并由发送循环优先发送。
Loading…
Cancel
Save