You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
181 lines
8.5 KiB
181 lines
8.5 KiB
"""
|
|
WebSocket适配器模块
|
|
负责连接WebSocket客户端和Channel,实现数据双向流动
|
|
遵循单一职责原则
|
|
"""
|
|
import asyncio
|
|
from typing import Dict, Optional, Any
|
|
import json
|
|
from datetime import datetime
|
|
from app.core.websocket.client import WebSocketClient
|
|
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
|
|
from app.core.websocket.serializer import websocket_serializer
|
|
from app.utils.structured_log import get_structured_logger, LogLevel
|
|
|
|
logger = get_structured_logger(__name__, LogLevel.INFO)
|
|
|
|
class WebSocketAdapter:
|
|
"""WebSocket适配器 - 数据转发
|
|
|
|
单一职责:
|
|
- 连接WebSocket客户端和Channel
|
|
- 实现数据双向流动
|
|
- 处理数据格式转换
|
|
- 管理发送和接收任务
|
|
|
|
不负责:
|
|
- WebSocket连接管理
|
|
- 心跳管理
|
|
- Channel管理
|
|
- 业务逻辑处理
|
|
"""
|
|
|
|
def __init__(self, client: WebSocketClient, outbound_channel: WebSocketChannel, inbound_channel: Optional[WebSocketChannel] = None):
|
|
self.client = client
|
|
self.outbound_channel = outbound_channel
|
|
self.inbound_channel = inbound_channel or outbound_channel
|
|
self._send_task: Optional[asyncio.Task] = None
|
|
self._created_at = datetime.now()
|
|
|
|
logger.info(f"创建WebSocket适配器: {client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
|
|
|
|
async def start(self):
|
|
"""启动适配器,开始数据双向流动
|
|
|
|
单一职责:只负责启动数据转发任务,不处理业务逻辑
|
|
"""
|
|
try:
|
|
# 注册WebSocket客户端的消息处理器,处理从WebSocket接收到的数据
|
|
self.client.register_message_handler("*", self._handle_websocket_message)
|
|
|
|
# 启动发送任务:从Channel读取数据发送到WebSocket
|
|
self._send_task = asyncio.create_task(self._send_loop())
|
|
|
|
logger.info(
|
|
f"WebSocket适配器启动成功: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket适配器启动失败: {e}")
|
|
raise
|
|
|
|
async def stop(self):
|
|
"""停止适配器
|
|
|
|
单一职责:只负责停止数据转发任务,不处理业务逻辑
|
|
"""
|
|
try:
|
|
# 取消发送任务
|
|
if self._send_task:
|
|
self._send_task.cancel()
|
|
|
|
# 注意:不取消注册消息处理器,因为可能有多个适配器使用同一个客户端
|
|
# 消息处理器会在客户端断开时统一清理
|
|
|
|
logger.info(f"WebSocket适配器已停止: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket适配器停止时出错: {e}")
|
|
|
|
async def _send_loop(self):
|
|
"""发送循环:从Channel读取数据发送到WebSocket
|
|
|
|
单一职责:只负责数据转发,不处理业务逻辑
|
|
"""
|
|
logger.info(f"发送循环启动: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})")
|
|
logger.info(f"连接状态检查: client.is_connected={self.client.is_connected}, outbound_channel.is_connected={self.outbound_channel.is_connected}")
|
|
logger.info(f"适配器类型: {'心跳适配器' if 'heartbeat' in self.outbound_channel.name else '接收适配器' if 'receive' in self.outbound_channel.name else '发送适配器'}")
|
|
try:
|
|
while self.client.is_connected and self.outbound_channel.is_connected:
|
|
try:
|
|
# 从Channel接收消息
|
|
msg = await self.outbound_channel.receive_message(timeout=0.5)
|
|
logger.debug(f"适配器接收消息: {self.outbound_channel.name} -> msg: {msg}")
|
|
if msg:
|
|
logger.info(f"适配器收到消息: {self.outbound_channel.name} -> type: {msg.type}, data: {msg.data}")
|
|
# 将消息数据序列化为JSON字符串
|
|
if msg.data is None:
|
|
# 如果data为None,跳过发送
|
|
logger.debug(f"适配器跳过空数据消息: {self.outbound_channel.name} -> {msg.type}")
|
|
elif isinstance(msg.data, dict):
|
|
# 在发送时添加type字段
|
|
send_data = {
|
|
"Type": msg.type,
|
|
**msg.data
|
|
}
|
|
# 使用序列化器序列化消息
|
|
payload = websocket_serializer.serialize(send_data)
|
|
# 发送数据到WebSocket
|
|
success = await self.client.send_raw(payload)
|
|
if success:
|
|
logger.debug(f"适配器发送消息成功: {self.outbound_channel.name} -> {msg.type}")
|
|
else:
|
|
logger.warning(f"适配器发送消息失败: {self.outbound_channel.name} -> {msg.type}")
|
|
else:
|
|
# 如果不是字典格式,记录日志但不发送
|
|
logger.warning(f"适配器跳过非字典格式消息: {self.outbound_channel.name} -> {msg.type}, 数据类型: {type(msg.data)}")
|
|
else:
|
|
# 没有消息时短暂等待
|
|
await asyncio.sleep(0.05)
|
|
|
|
except asyncio.TimeoutError:
|
|
# 超时是正常的,继续循环
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"适配器发送循环异常: {e}")
|
|
await asyncio.sleep(0.1)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(
|
|
f"适配器发送任务已取消: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"适配器发送任务异常: {e}")
|
|
|
|
async def _handle_websocket_message(self, message: Dict[str, Any]):
|
|
"""处理从WebSocket接收到的消息,插入到入站Channel
|
|
|
|
单一职责:只负责数据转发,不处理业务逻辑
|
|
"""
|
|
try:
|
|
# 若消息携带channel且与入站通道不一致,则忽略,避免多适配器重复写入
|
|
msg_channel = message.get("channel")
|
|
if msg_channel and msg_channel != self.inbound_channel.name:
|
|
return
|
|
|
|
# 创建Channel消息
|
|
channel_message = ChannelMessage(
|
|
type=message.get("type", "data"),
|
|
data=message.get("data"), # 直接透传
|
|
priority=message.get("priority", 0) # 支持优先级
|
|
)
|
|
|
|
# 添加调试日志
|
|
logger.info(f"适配器处理WebSocket消息: {self.client.name} -> type: {message.get('type')}, data: {message.get('data')}")
|
|
|
|
# 插入到Channel
|
|
success = await self.inbound_channel.send_message(channel_message)
|
|
|
|
if success:
|
|
logger.debug(f"适配器接收消息成功: {message.get('type')} -> {self.inbound_channel.name}")
|
|
else:
|
|
logger.warning(f"适配器接收消息失败: {message.get('type')} -> {self.inbound_channel.name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"适配器处理WebSocket消息异常: {e}")
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""获取适配器统计信息"""
|
|
return {
|
|
"client_name": self.client.name,
|
|
"outbound_channel_name": self.outbound_channel.name,
|
|
"inbound_channel_name": self.inbound_channel.name,
|
|
"client_connected": self.client.is_connected,
|
|
"outbound_connected": self.outbound_channel.is_connected,
|
|
"inbound_connected": self.inbound_channel.is_connected,
|
|
"send_task_running": self._send_task and not self._send_task.done(),
|
|
"created_at": self._created_at,
|
|
"client_stats": self.client.get_stats(),
|
|
"outbound_stats": self.outbound_channel.get_stats(),
|
|
"inbound_stats": self.inbound_channel.get_stats()
|
|
}
|