You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
8.0 KiB

"""
WebSocket适配器模块
负责连接WebSocket客户端和Channel,实现数据双向流动
遵循单一职责原则
"""
import asyncio
from typing import Dict, Optional, Any
import json
from datetime import datetime
from app.core.websocket.client import WebSocketClient
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.INFO)
class WebSocketAdapter:
"""WebSocket适配器 - 数据转发
单一职责:
- 连接WebSocket客户端和Channel
- 实现数据双向流动
- 处理数据格式转换
- 管理发送和接收任务
不负责:
- WebSocket连接管理
- 心跳管理
- Channel管理
- 业务逻辑处理
"""
def __init__(self, client: WebSocketClient, outbound_channel: WebSocketChannel, inbound_channel: Optional[WebSocketChannel] = None):
self.client = client
self.outbound_channel = outbound_channel
self.inbound_channel = inbound_channel or outbound_channel
self._send_task: Optional[asyncio.Task] = None
self._created_at = datetime.now()
logger.info(f"创建WebSocket适配器: {client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
async def start(self):
"""启动适配器,开始数据双向流动
单一职责:只负责启动数据转发任务,不处理业务逻辑
"""
try:
# 注册WebSocket客户端的消息处理器,处理从WebSocket接收到的数据
self.client.register_message_handler("*", self._handle_websocket_message)
# 启动发送任务:从Channel读取数据发送到WebSocket
self._send_task = asyncio.create_task(self._send_loop())
logger.info(
f"WebSocket适配器启动成功: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
)
except Exception as e:
logger.error(f"WebSocket适配器启动失败: {e}")
raise
async def stop(self):
"""停止适配器
单一职责:只负责停止数据转发任务,不处理业务逻辑
"""
try:
# 取消发送任务
if self._send_task:
self._send_task.cancel()
# 注意:不取消注册消息处理器,因为可能有多个适配器使用同一个客户端
# 消息处理器会在客户端断开时统一清理
logger.info(f"WebSocket适配器已停止: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
except Exception as e:
logger.error(f"WebSocket适配器停止时出错: {e}")
async def _send_loop(self):
"""发送循环:从Channel读取数据发送到WebSocket
单一职责:只负责数据转发,不处理业务逻辑
"""
logger.info(f"发送循环启动: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
logger.info(f"连接状态检查: client.is_connected={self.client.is_connected}, outbound_channel.is_connected={self.outbound_channel.is_connected}")
try:
while self.client.is_connected and self.outbound_channel.is_connected:
try:
# 从Channel接收消息
msg = await self.outbound_channel.receive_message(timeout=0.5)
logger.debug(f"适配器接收消息: {self.outbound_channel.name} -> msg: {msg}")
if msg:
logger.info(f"适配器收到消息: {self.outbound_channel.name} -> type: {msg.type}, data: {msg.data}")
# 将消息数据序列化为JSON字符串
import json
if msg.data is None:
# 如果data为None,跳过发送
logger.debug(f"适配器跳过空数据消息: {self.outbound_channel.name} -> {msg.type}")
elif isinstance(msg.data, dict):
# 在发送时添加type字段
send_data = {
"Type": msg.type,
**msg.data
}
payload = json.dumps(send_data)
# 发送数据到WebSocket
success = await self.client.send_raw(payload)
if success:
logger.debug(f"适配器发送消息成功: {self.outbound_channel.name} -> {msg.type}")
else:
logger.warning(f"适配器发送消息失败: {self.outbound_channel.name} -> {msg.type}")
else:
# 如果不是字典格式,记录日志但不发送
logger.warning(f"适配器跳过非字典格式消息: {self.outbound_channel.name} -> {msg.type}, 数据类型: {type(msg.data)}")
else:
# 没有消息时短暂等待
await asyncio.sleep(0.05)
except asyncio.TimeoutError:
# 超时是正常的,继续循环
continue
except Exception as e:
logger.error(f"适配器发送循环异常: {e}")
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info(
f"适配器发送任务已取消: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
)
except Exception as e:
logger.error(f"适配器发送任务异常: {e}")
async def _handle_websocket_message(self, message: Dict[str, Any]):
"""处理从WebSocket接收到的消息,插入到入站Channel
单一职责:只负责数据转发,不处理业务逻辑
"""
try:
# 若消息携带channel且与入站通道不一致,则忽略,避免多适配器重复写入
msg_channel = message.get("channel")
if msg_channel and msg_channel != self.inbound_channel.name:
return
# 创建Channel消息
channel_message = ChannelMessage(
type=message.get("type", "data"),
data=message.get("data"), # 直接透传
priority=message.get("priority", 0) # 支持优先级
)
# 插入到Channel
success = await self.inbound_channel.send_message(channel_message)
if success:
logger.debug(f"适配器接收消息成功: {message.get('type')} -> {self.inbound_channel.name}")
else:
logger.warning(f"适配器接收消息失败: {message.get('type')} -> {self.inbound_channel.name}")
except Exception as e:
logger.error(f"适配器处理WebSocket消息异常: {e}")
def get_stats(self) -> Dict[str, Any]:
"""获取适配器统计信息"""
return {
"client_name": self.client.name,
"outbound_channel_name": self.outbound_channel.name,
"inbound_channel_name": self.inbound_channel.name,
"client_connected": self.client.is_connected,
"outbound_connected": self.outbound_channel.is_connected,
"inbound_connected": self.inbound_channel.is_connected,
"send_task_running": self._send_task and not self._send_task.done(),
"created_at": self._created_at,
"client_stats": self.client.get_stats(),
"outbound_stats": self.outbound_channel.get_stats(),
"inbound_stats": self.inbound_channel.get_stats()
}