You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

538 lines
23 KiB

"""
WebSocket管理器模块
轻量级协调器,负责协调各个专门的管理器
遵循单一职责原则
"""
import asyncio
from typing import Dict, List, Optional, Any
from datetime import datetime
from app.core.websocket.client import WebSocketClient
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
from app.core.websocket.client_manager import WebSocketClientManager
from app.core.websocket.channel_manager import WebSocketChannelManager
from app.core.websocket.send_controller import WebSocketSendController
from app.core.websocket.heartbeat_manager import WebSocketHeartbeatManager
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.INFO)
class WebSocketManager:
"""WebSocket管理器 - 轻量级协调器
单一职责:只负责协调各个专门的管理器
- 协调客户端管理器
- 协调Channel管理器
- 协调发送控制器
- 协调心跳管理器
- 提供统一的API接口
架构设计:
- 一个客户端只需要3个Channel:心跳、发送、接收
- 一个客户端只需要1个发送控制器:统一管理所有数据发送
- 一个客户端只需要1个心跳管理器:管理心跳任务
"""
def __init__(self):
# 专门的管理器
self._client_manager = WebSocketClientManager()
self._channel_manager = WebSocketChannelManager()
self._heartbeat_manager = WebSocketHeartbeatManager()
# 发送控制器
self._send_controllers: Dict[str, WebSocketSendController] = {}
# 接收消息处理器任务
self._receive_tasks: Dict[str, asyncio.Task] = {}
self._created_at = datetime.now()
logger.info("WebSocket管理器初始化完成")
async def create_client(self, name: str, url: str, heartbeat_interval: int = 120) -> WebSocketClient:
"""创建WebSocket客户端并自动创建3个Channel"""
try:
# 检查是否已存在客户端
existing_client = self._client_manager.get_client(name)
if existing_client:
logger.info(f"WebSocket客户端 {name} 已存在,检查状态")
# 如果客户端已连接,先断开
if existing_client.is_connected:
logger.info(f"WebSocket客户端 {name} 已连接,先断开")
await self._client_manager.disconnect_client(name)
# 更新心跳间隔配置
existing_client.heartbeat_interval = heartbeat_interval
client = existing_client
else:
# 1. 创建客户端
client = await self._client_manager.create_client(name, url, heartbeat_interval)
# 2. 创建Channel(如果已存在会返回已存在的Channel)
await self._channel_manager.create_client_channels(name)
# 3. 注册消息处理器
await self._register_client_handlers(name)
logger.info(f"WebSocket管理器创建客户端成功: {name} -> {url}")
return client
except Exception as e:
logger.error(f"WebSocket管理器创建客户端失败: {name} - {e}")
raise
async def _register_client_handlers(self, client_name: str):
"""为客户端注册消息处理器"""
try:
client = self._client_manager.get_client(client_name)
if not client:
logger.error(f"WebSocket客户端 {client_name} 不存在")
return
# 注册接收消息处理器,处理从WebSocket接收到的数据
receive_channel = self._channel_manager.get_channel(f"{client_name}_receive")
if receive_channel:
# 创建异步消息处理器
async def message_handler(msg):
await self._handle_received_message(client_name, msg)
client.register_message_handler("*", message_handler)
logger.info(f"WebSocket管理器注册接收消息处理器: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器注册消息处理器失败: {client_name} - {e}")
raise
async def _handle_received_message(self, client_name: str, message: Dict[str, Any]):
"""处理从WebSocket接收到的消息,插入到接收Channel"""
try:
receive_channel = self._channel_manager.get_channel(f"{client_name}_receive")
if not receive_channel:
logger.warning(f"接收Channel不存在: {client_name}_receive")
return
# 创建Channel消息
channel_message = ChannelMessage(
type=message.get("type", "data"),
data=message.get("data"), # 直接透传
priority=message.get("priority", 0) # 支持优先级
)
# 添加调试日志
logger.info(f"管理器处理WebSocket消息: {client_name} -> type: {message.get('type')}, data: {message.get('data')}")
# 发送到接收Channel
success = await receive_channel.send_message(channel_message)
if success:
logger.debug(f"WebSocket管理器消息转发成功: {client_name} -> receive_channel")
else:
logger.warning(f"WebSocket管理器消息转发失败: {client_name} -> receive_channel")
except Exception as e:
logger.error(f"WebSocket管理器处理接收消息异常: {client_name} - {e}")
async def connect_client(self, name: str) -> bool:
"""连接指定客户端"""
try:
# 1. 连接客户端
success = await self._client_manager.connect_client(name)
if not success:
logger.error(f"WebSocket客户端连接失败: {name}")
return False
# 2. 启动Channel
await self._channel_manager.start_client_channels(name)
# 3. 等待Channel完全启动(确保Channel状态为connected)
await self._wait_for_channels_ready(name)
# 4. 创建并启动发送控制器
await self._create_and_start_send_controller(name)
# 5. 启动心跳任务
client = self._client_manager.get_client(name)
if client and hasattr(client, 'heartbeat_interval'):
# 检查是否已存在心跳任务
await self._heartbeat_manager.stop_heartbeat_task(name) # 先停止已存在的任务
await self._heartbeat_manager.start_heartbeat_task(name, client, client.heartbeat_interval)
# 6. 启动接收消息处理器
await self._create_and_start_receive_processor(name)
logger.info(f"WebSocket管理器客户端连接成功: {name}")
return True
except Exception as e:
logger.error(f"WebSocket管理器连接客户端失败: {name} - {e}")
return False
async def _wait_for_channels_ready(self, client_name: str, timeout: float = 5.0):
"""等待客户端的所有Channel完全启动"""
import asyncio
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < timeout:
# 检查所有Channel是否都已连接
client_channels = self._channel_manager.get_client_channels(client_name)
all_connected = True
for channel_name, channel in client_channels.items():
if not channel.is_connected:
all_connected = False
logger.debug(f"等待Channel连接: {channel_name}")
break
if all_connected:
logger.info(f"所有Channel已准备就绪: {client_name}")
return True
# 等待100ms后再次检查
await asyncio.sleep(0.1)
logger.warning(f"等待Channel就绪超时: {client_name} (超时时间: {timeout}秒)")
return False
async def _create_and_start_send_controller(self, client_name: str):
"""创建并启动发送控制器"""
try:
client = self._client_manager.get_client(client_name)
if not client:
logger.error(f"WebSocket客户端 {client_name} 不存在")
return
# 检查是否已存在发送控制器
if client_name in self._send_controllers:
existing_controller = self._send_controllers[client_name]
logger.info(f"WebSocket发送控制器 {client_name} 已存在,先停止")
await existing_controller.stop()
del self._send_controllers[client_name]
# 获取Channel
heartbeat_channel = self._channel_manager.get_channel(f"{client_name}_heartbeat")
send_channel = self._channel_manager.get_channel(f"{client_name}_send")
if not heartbeat_channel or not send_channel:
logger.error(f"Channel不存在,无法创建发送控制器: {client_name}")
return
# 创建发送控制器
send_controller = WebSocketSendController(
client=client,
heartbeat_channel=heartbeat_channel,
send_channel=send_channel
)
# 启动发送控制器
success = await send_controller.start()
if success:
self._send_controllers[client_name] = send_controller
logger.info(f"WebSocket管理器启动发送控制器成功: {client_name}")
else:
logger.error(f"WebSocket管理器启动发送控制器失败: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器创建发送控制器失败: {client_name} - {e}")
async def _create_and_start_receive_processor(self, client_name: str):
"""创建并启动接收消息处理器"""
try:
# 检查是否已存在接收处理器
if client_name in self._receive_tasks:
existing_task = self._receive_tasks[client_name]
logger.info(f"WebSocket接收消息处理器 {client_name} 已存在,先停止")
if not existing_task.done():
existing_task.cancel()
try:
await existing_task
except asyncio.CancelledError:
pass
del self._receive_tasks[client_name]
# 获取接收Channel
receive_channel = self._channel_manager.get_channel(f"{client_name}_receive")
if not receive_channel:
logger.error(f"接收Channel不存在: {client_name}_receive")
return
# 创建接收消息处理任务
receive_task = asyncio.create_task(self._receive_message_loop(client_name, receive_channel))
self._receive_tasks[client_name] = receive_task
logger.info(f"WebSocket管理器启动接收消息处理器成功: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器创建接收消息处理器失败: {client_name} - {e}")
async def _receive_message_loop(self, client_name: str, receive_channel):
"""接收消息处理循环
单一职责:只负责从接收Channel读取消息并进行业务处理
"""
logger.info(f"WebSocket管理器开始接收消息处理循环: {client_name}")
try:
while receive_channel.is_connected:
try:
# 从接收Channel获取消息
message = await receive_channel.receive_message()
if message:
# 处理接收到的消息
await self._process_received_message(client_name, message)
else:
# 没有消息时短暂等待
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info(f"WebSocket管理器接收消息处理循环被取消: {client_name}")
break
except Exception as e:
logger.error(f"WebSocket管理器接收消息处理循环异常: {client_name} - {e}")
await asyncio.sleep(1) # 异常时等待1秒再重试
logger.info(f"WebSocket管理器接收消息处理循环结束: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器接收消息处理循环严重异常: {client_name} - {e}")
async def _process_received_message(self, client_name: str, message: ChannelMessage):
"""处理接收到的消息
单一职责:只负责业务逻辑处理,不处理WebSocket通信
"""
try:
logger.info(f"WebSocket管理器处理接收消息: {client_name} -> type: {message.type}")
# 根据消息类型进行不同的业务处理
if message.type == "heartbeat":
await self._handle_heartbeat_response(client_name, message)
elif message.type == "data":
await self._handle_data_message(client_name, message)
elif message.type == "command":
await self._handle_command_message(client_name, message)
else:
logger.info(f"WebSocket管理器收到未知类型消息: {client_name} -> {message.type}")
except Exception as e:
logger.error(f"WebSocket管理器处理接收消息异常: {client_name} -> {message.type} - {e}")
async def _handle_heartbeat_response(self, client_name: str, message: ChannelMessage):
"""处理心跳响应消息"""
try:
logger.debug(f"WebSocket管理器处理心跳响应: {client_name}")
# 这里可以添加心跳响应的业务逻辑
# 例如:更新客户端状态、记录心跳时间等
except Exception as e:
logger.error(f"WebSocket管理器处理心跳响应异常: {client_name} - {e}")
async def _handle_data_message(self, client_name: str, message: ChannelMessage):
"""处理数据消息"""
try:
logger.debug(f"WebSocket管理器处理数据消息: {client_name} -> {message.data}")
# 这里可以添加数据消息的业务逻辑
# 例如:数据转发、数据存储、数据分析等
except Exception as e:
logger.error(f"WebSocket管理器处理数据消息异常: {client_name} - {e}")
async def _handle_command_message(self, client_name: str, message: ChannelMessage):
"""处理命令消息"""
try:
logger.debug(f"WebSocket管理器处理命令消息: {client_name} -> {message.data}")
# 这里可以添加命令消息的业务逻辑
# 例如:执行命令、状态更新、配置修改等
except Exception as e:
logger.error(f"WebSocket管理器处理命令消息异常: {client_name} - {e}")
async def disconnect_client(self, name: str) -> bool:
"""断开指定客户端"""
try:
# 1. 停止心跳任务
await self._heartbeat_manager.stop_heartbeat_task(name)
# 2. 停止发送控制器
await self._stop_send_controller(name)
# 3. 停止接收消息处理器
await self._stop_receive_processor(name)
# 4. 等待发送控制器完全停止(确保不再访问Channel)
await asyncio.sleep(0.1)
# 5. 停止Channel
await self._channel_manager.stop_client_channels(name)
# 6. 断开客户端
success = await self._client_manager.disconnect_client(name)
logger.info(f"WebSocket管理器客户端断开成功: {name}")
return success
except Exception as e:
logger.error(f"WebSocket管理器断开客户端失败: {name} - {e}")
return False
async def _stop_send_controller(self, client_name: str):
"""停止发送控制器"""
try:
if client_name in self._send_controllers:
send_controller = self._send_controllers[client_name]
await send_controller.stop()
del self._send_controllers[client_name]
logger.info(f"WebSocket管理器停止发送控制器: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器停止发送控制器失败: {client_name} - {e}")
async def _stop_receive_processor(self, client_name: str):
"""停止接收消息处理器"""
try:
if client_name in self._receive_tasks:
receive_task = self._receive_tasks[client_name]
if not receive_task.done():
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
del self._receive_tasks[client_name]
logger.info(f"WebSocket管理器停止接收消息处理器: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器停止接收消息处理器失败: {client_name} - {e}")
async def remove_client(self, name: str) -> bool:
"""移除指定客户端"""
try:
# 1. 停止心跳任务
await self._heartbeat_manager.stop_heartbeat_task(name)
# 2. 停止发送控制器
await self._stop_send_controller(name)
# 3. 停止接收消息处理器
await self._stop_receive_processor(name)
# 4. 移除Channel
await self._channel_manager.remove_client_channels(name)
# 5. 移除客户端
success = await self._client_manager.remove_client(name)
logger.info(f"WebSocket管理器客户端移除成功: {name}")
return success
except Exception as e:
logger.error(f"WebSocket管理器移除客户端失败: {name} - {e}")
return False
async def send_message(self, client_name: str, message_type: str, data: Any, priority: int = 0) -> bool:
"""发送消息到指定客户端"""
try:
# 根据消息类型选择Channel
if message_type == "heartbeat":
channel_name = f"{client_name}_heartbeat"
else:
channel_name = f"{client_name}_send"
# 发送到Channel
success = await self._channel_manager.send_message_to_channel(channel_name, message_type, data, priority)
return success
except Exception as e:
logger.error(f"WebSocket管理器发送消息异常: {client_name} -> {message_type} - {e}")
return False
async def send_heartbeat(self, client_name: str) -> bool:
"""发送心跳消息"""
try:
# 获取客户端
client = self._client_manager.get_client(client_name)
if not client:
logger.error(f"WebSocket客户端 {client_name} 不存在")
return False
# 直接调用心跳发送方法
success = await self._heartbeat_manager._send_heartbeat_message(client_name, client)
return success
except Exception as e:
logger.error(f"WebSocket管理器发送心跳失败: {client_name} - {e}")
return False
def get_client(self, name: str) -> Optional[WebSocketClient]:
"""获取指定客户端"""
return self._client_manager.get_client(name)
def get_channel(self, name: str) -> Optional[WebSocketChannel]:
"""获取指定Channel"""
return self._channel_manager.get_channel(name)
def get_client_channels(self, client_name: str) -> Dict[str, WebSocketChannel]:
"""获取指定客户端的所有Channel"""
return self._channel_manager.get_client_channels(client_name)
def get_all_clients(self) -> Dict[str, WebSocketClient]:
"""获取所有客户端"""
return self._client_manager.get_all_clients()
def get_all_channels(self) -> Dict[str, WebSocketChannel]:
"""获取所有Channel"""
return self._channel_manager.get_all_channels()
async def cleanup(self):
"""清理所有资源"""
try:
# 清理心跳管理器
await self._heartbeat_manager.cleanup()
# 清理发送控制器
for client_name in list(self._send_controllers.keys()):
await self._stop_send_controller(client_name)
# 清理接收消息处理器
for client_name in list(self._receive_tasks.keys()):
await self._stop_receive_processor(client_name)
# 清理Channel管理器
await self._channel_manager.cleanup()
# 清理客户端管理器
await self._client_manager.cleanup()
logger.info("WebSocket管理器清理完成")
except Exception as e:
logger.error(f"WebSocket管理器清理失败: {e}")
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
stats = {
"created_at": self._created_at.isoformat(),
"client_stats": self._client_manager.get_stats(),
"channel_stats": self._channel_manager.get_stats(),
"heartbeat_stats": self._heartbeat_manager.get_stats(),
"send_controller_count": len(self._send_controllers),
"receive_processor_count": len(self._receive_tasks),
"send_controllers": {},
"receive_processors": {}
}
for client_name, controller in self._send_controllers.items():
stats["send_controllers"][client_name] = controller.get_stats()
for client_name, task in self._receive_tasks.items():
stats["receive_processors"][client_name] = {
"is_running": not task.done(),
"is_cancelled": task.cancelled(),
"exception": str(task.exception()) if task.exception() else None
}
return stats
# 全局WebSocket管理器实例
websocket_manager = WebSocketManager()