You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

199 lines
9.7 KiB

"""
WebSocket适配器模块
负责连接WebSocket客户端和Channel,实现数据双向流动
"""
import asyncio
from typing import Dict, Optional, Any
import json
from datetime import datetime
from app.core.websocket.client import WebSocketClient
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
from app.utils.structured_log import get_structured_logger, LogLevel
from datetime import timedelta
logger = get_structured_logger(__name__, LogLevel.INFO)
class WebSocketAdapter:
"""WebSocket适配器 - 连接WebSocket客户端和Channel
支持出入通道分离:outbound用于发送,inbound用于接收
"""
def __init__(self, client: WebSocketClient, outbound_channel: WebSocketChannel, inbound_channel: Optional[WebSocketChannel] = None, heartbeat_interval: int = 120):
self.client = client
self.outbound_channel = outbound_channel
self.inbound_channel = inbound_channel or outbound_channel
self.heartbeat_interval = heartbeat_interval
self._send_task: Optional[asyncio.Task] = None
self._heartbeat_task: Optional[asyncio.Task] = None
self._created_at = datetime.now()
logger.info(f"创建WebSocket适配器: {client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name}) 心跳间隔:{heartbeat_interval}")
async def start(self):
"""启动适配器,开始数据双向流动"""
try:
# 注册WebSocket客户端的消息处理器,处理从WebSocket接收到的数据
self.client.register_message_handler("*", self._handle_websocket_message)
# 启动发送任务:从Channel读取数据发送到WebSocket
self._send_task = asyncio.create_task(self._send_loop())
# 启动心跳任务:以优先级消息写入优先级Channel,由发送循环优先发送
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info(
f"WebSocket适配器启动成功: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
)
except Exception as e:
logger.error(f"WebSocket适配器启动失败: {e}")
raise
async def stop(self):
"""停止适配器"""
try:
# 取消发送任务
if self._send_task:
self._send_task.cancel()
if self._heartbeat_task:
self._heartbeat_task.cancel()
# 取消注册消息处理器
self.client.unregister_message_handler("*")
logger.info(f"WebSocket适配器已停止: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
except Exception as e:
logger.error(f"WebSocket适配器停止时出错: {e}")
async def _send_loop(self):
"""发送循环:优先处理优先级Channel,其次处理普通出站Channel"""
logger.info(f"发送循环启动: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
try:
# 优先级Channel命名约定:f"{outbound}_priority";不存在则仅用普通通道
priority_channel_name = f"{self.outbound_channel.name}_priority"
priority_channel = None
try:
from app.core.websocket.manager import websocket_manager
priority_channel = websocket_manager.get_channel(priority_channel_name)
except Exception:
priority_channel = None
while self.client.is_connected and self.outbound_channel.is_connected:
processed_any = False
# 先尝试处理优先级消息(非阻塞,尽可能多取)
if priority_channel and priority_channel.is_connected:
while True:
pmsg = priority_channel.try_receive_message()
if not pmsg:
break
processed_any = True
# 由Channel侧保证消息已组装为最终载荷;此处只透传
assembled = pmsg.data
success = await self.client._send_raw(assembled)
if success:
logger.debug(f"适配器发送优先级消息成功: {self.outbound_channel.name} -> {pmsg.type}")
else:
logger.warning(f"适配器发送优先级消息失败: {self.outbound_channel.name} -> {pmsg.type}")
# 再处理普通消息(阻塞等待一小段时间)
try:
msg = await self.outbound_channel.receive_message(timeout=0.5)
except asyncio.TimeoutError:
msg = None
if msg:
processed_any = True
assembled = msg.data
success = await self.client._send_raw(assembled)
if success:
logger.debug(f"适配器发送普通消息成功: {self.outbound_channel.name} -> {msg.type}")
else:
logger.warning(f"适配器发送普通消息失败: {self.outbound_channel.name} -> {msg.type}")
if not processed_any:
await asyncio.sleep(0.05)
except asyncio.CancelledError:
logger.info(
f"适配器发送任务已取消: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
)
except Exception as e:
logger.error(f"适配器发送任务异常: {e}")
async def _handle_websocket_message(self, message: Dict[str, Any]):
"""处理从WebSocket接收到的消息,插入到入站Channel"""
try:
# 若消息携带channel且与入站通道不一致,则忽略,避免多适配器重复写入
msg_channel = message.get("channel")
if msg_channel and msg_channel != self.inbound_channel.name:
return
# 创建Channel消息
# 由上游(服务端或对端)提供的message,此处不做组装,仅转为 ChannelMessage
channel_message = ChannelMessage(
type=message.get("type", ""),
data=message.get("data") # 直接透传
)
# 插入到Channel
success = await self.inbound_channel.send_message(channel_message)
if success:
logger.debug(f"适配器接收消息成功: {message.get('type')} -> {self.inbound_channel.name}")
else:
logger.warning(f"适配器接收消息失败: {message.get('type')} -> {self.inbound_channel.name}")
except Exception as e:
logger.error(f"适配器处理WebSocket消息异常: {e}")
def get_stats(self) -> Dict[str, Any]:
"""获取适配器统计信息"""
return {
"client_name": self.client.name,
"outbound_channel_name": self.outbound_channel.name,
"inbound_channel_name": self.inbound_channel.name,
"client_connected": self.client.is_connected,
"outbound_connected": self.outbound_channel.is_connected,
"inbound_connected": self.inbound_channel.is_connected,
"send_task_running": self._send_task and not self._send_task.done(),
"heartbeat_task_running": self._heartbeat_task and not self._heartbeat_task.done(),
"created_at": self._created_at,
"client_stats": self.client.get_stats(),
"outbound_stats": self.outbound_channel.get_stats(),
"inbound_stats": self.inbound_channel.get_stats()
}
async def _heartbeat_loop(self):
"""心跳循环:以优先级消息写入,并由发送循环优先处理"""
logger.info(f"心跳循环启动: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name}) 间隔:{self.heartbeat_interval}")
try:
priority_channel_name = f"{self.outbound_channel.name}_priority"
from app.core.websocket.manager import websocket_manager
# 确保优先级通道存在并连接
pch = websocket_manager.get_channel(priority_channel_name)
if not pch:
logger.info(f"创建优先级通道: {priority_channel_name}")
pch = await websocket_manager.create_channel(priority_channel_name, max_size=100)
await pch.connect()
logger.info(f"优先级通道连接成功: {priority_channel_name}")
while self.client.is_connected:
try:
# 心跳仅发送 Payload,服务端只认 payload:{"Message":"ping"}
payload = json.dumps({"Message": "ping"})
msg = ChannelMessage(type="heartbeat", data=payload)
await pch.send_message(msg)
logger.debug(f"心跳消息已发送: {self.client.name} -> {priority_channel_name}")
await asyncio.sleep(self.heartbeat_interval)
except asyncio.CancelledError:
logger.info(f"心跳任务被取消: {self.client.name}")
break
except Exception as e:
logger.error(f"心跳写入优先级Channel失败: {e}")
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info(f"心跳任务已取消: {self.client.name}")
except Exception as e:
logger.error(f"心跳任务异常: {self.client.name} - {e}")