You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

708 lines
31 KiB

"""
WebSocket管理器
"""
import asyncio
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.core.websocket.client import WebSocketClient
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
from app.core.websocket.send_controller import WebSocketSendController
from app.core.websocket.client_manager import WebSocketClientManager
from app.core.websocket.channel_manager import WebSocketChannelManager
from app.utils.structured_log import get_structured_logger
logger = get_structured_logger(__name__)
class WebSocketManager:
"""WebSocket管理器 - 轻量级协调器
单一职责:只负责协调各个专门的管理器
- 协调客户端管理器
- 协调Channel管理器
- 协调发送控制器
- 协调心跳管理器
- 提供统一的API接口
架构设计:
- 一个客户端只需要3个Channel:心跳、发送、接收
- 一个客户端只需要1个发送控制器:统一管理所有数据发送
- 一个客户端只需要1个心跳管理器:管理心跳任务
"""
def __init__(self):
# 专门的管理器
self._client_manager = WebSocketClientManager()
self._channel_manager = WebSocketChannelManager()
# 发送控制器
self._send_controllers: Dict[str, WebSocketSendController] = {}
# 接收消息处理器任务
self._receive_tasks: Dict[str, asyncio.Task] = {}
self._created_at = datetime.now()
# 新增:设备事件推送支持
self._device_manager = None # 延迟初始化
# 新增:缓存机器码,避免频繁获取
self._service_serial = None
self._init_service_serial()
logger.info("WebSocket管理器初始化完成")
def _init_service_serial(self):
"""初始化服务序列号(机器码)"""
try:
from app.utils.system_utils import SystemUtils
self._service_serial = SystemUtils.get_machine_code()
if self._service_serial:
logger.info(f"WebSocket管理器机器码初始化成功: {self._service_serial}")
else:
logger.warning("WebSocket管理器机器码获取失败,将使用默认值")
self._service_serial = "unknown"
except Exception as e:
logger.error(f"WebSocket管理器机器码初始化异常: {e}")
self._service_serial = "unknown"
async def _get_device_manager(self):
"""获取设备管理器实例(延迟初始化)"""
if self._device_manager is None:
logger.debug("延迟初始化设备管理器实例")
from app.core.device.manager import device_manager
self._device_manager = device_manager
# 设置WebSocket推送回调函数,避免循环依赖
self._device_manager.set_websocket_push_callback(self._websocket_push_adapter())
logger.debug("设备管理器实例初始化完成")
return self._device_manager
def _websocket_push_adapter(self):
"""创建WebSocket推送适配器,提供回调接口"""
class WebSocketPushAdapter:
def __init__(self, manager):
self.manager = manager
async def check_client_status(self, client_name: str) -> bool:
"""检查客户端状态"""
try:
client = self.manager._client_manager.get_client(client_name)
return client and client.is_connected
except Exception as e:
logger.error(f"检查客户端状态失败: {client_name} - {e}")
return False
async def send_message(self, client_name: str, message_type: str, data: Any, priority: int = 0) -> bool:
"""发送消息到客户端"""
try:
return await self.manager.send_message(client_name, message_type, data, priority)
except Exception as e:
logger.error(f"发送消息到客户端失败: {client_name} - {e}")
return False
return WebSocketPushAdapter(self)
async def create_client(self, name: str, url: str, heartbeat_interval: int = 120) -> WebSocketClient:
"""创建WebSocket客户端并自动创建3个Channel"""
try:
# 检查是否已存在客户端
existing_client = self._client_manager.get_client(name)
if existing_client:
logger.info(f"WebSocket客户端 {name} 已存在,检查状态")
# 如果客户端已连接,先断开
if existing_client.is_connected:
logger.info(f"WebSocket客户端 {name} 已连接,先断开")
await self._client_manager.disconnect_client(name)
# 更新心跳间隔配置
existing_client.heartbeat_interval = heartbeat_interval
client = existing_client
else:
# 1. 创建客户端
client = await self._client_manager.create_client(name, url, heartbeat_interval)
# 2. 创建Channel(如果已存在会返回已存在的Channel)
await self._channel_manager.create_client_channels(name)
# 3. 注册消息处理器
await self._register_client_handlers(name)
logger.info(f"WebSocket管理器创建客户端成功: {name} -> {url}")
return client
except Exception as e:
logger.error(f"WebSocket管理器创建客户端失败: {name} - {e}")
raise
async def _register_client_handlers(self, client_name: str):
"""为客户端注册消息处理器"""
try:
client = self._client_manager.get_client(client_name)
if not client:
logger.error(f"WebSocket客户端 {client_name} 不存在")
return
# 注册接收消息处理器,处理从WebSocket接收到的数据
receive_channel = self._channel_manager.get_channel(f"{client_name}_receive")
if receive_channel:
# 创建异步消息处理器
async def message_handler(msg):
await self._handle_received_message(client_name, msg)
client.register_message_handler("*", message_handler)
logger.info(f"WebSocket管理器注册接收消息处理器: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器注册消息处理器失败: {client_name} - {e}")
raise
async def _handle_received_message(self, client_name: str, message: Dict[str, Any]):
"""处理从WebSocket接收到的消息,插入到接收Channel"""
try:
receive_channel = self._channel_manager.get_channel(f"{client_name}_receive")
if not receive_channel:
logger.warning(f"接收Channel不存在: {client_name}_receive")
return
# 创建Channel消息
channel_message = ChannelMessage(
type=message.get("type", "data"),
data=message.get("data"), # 直接透传
priority=message.get("priority", 0) # 支持优先级
)
# 添加调试日志
logger.info(f"管理器处理WebSocket消息: {client_name} -> type: {message.get('type')}, data: {message.get('payload')}")
# 发送到接收Channel
success = await receive_channel.send_message(channel_message)
if success:
logger.debug(f"WebSocket管理器消息转发成功: {client_name} -> receive_channel")
else:
logger.warning(f"WebSocket管理器消息转发失败: {client_name} -> receive_channel")
except Exception as e:
logger.error(f"WebSocket管理器处理接收消息异常: {client_name} - {e}")
async def connect_client(self, name: str) -> bool:
"""连接指定客户端"""
try:
# 1. 连接客户端
success = await self._client_manager.connect_client(name)
if not success:
logger.error(f"WebSocket客户端连接失败: {name}")
return False
# 2. 启动Channel
await self._channel_manager.start_client_channels(name)
# 3. 等待Channel完全启动(确保Channel状态为connected)
await self._wait_for_channels_ready(name)
# 4. 创建并启动发送控制器
await self._create_and_start_send_controller(name)
# 5. 心跳功能已集成到SendController中,无需单独启动
# 6. 启动接收消息处理器
await self._create_and_start_receive_processor(name)
# 新增:异步注册设备事件推送(不阻塞API响应)
logger.debug(f"为客户端 {name} 异步注册设备事件推送")
device_manager = await self._get_device_manager()
# 创建异步任务并添加错误处理
register_task = asyncio.create_task(self._async_register_device_events(name, device_manager))
register_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"注册设备事件推送-{name}"))
# 新增:异步推送缓冲的事件到新连接的客户端(不阻塞API响应)
logger.debug(f"为客户端 {name} 异步推送缓冲事件")
push_task = asyncio.create_task(self._async_push_buffered_events(name, device_manager))
push_task.add_done_callback(lambda t: self._handle_async_task_result(t, f"推送缓冲事件-{name}"))
logger.info(f"WebSocket管理器客户端连接成功: {name}")
return True
except Exception as e:
logger.error(f"WebSocket管理器连接客户端失败: {name} - {e}")
return False
async def _wait_for_channels_ready(self, client_name: str, timeout: float = 5.0):
"""等待客户端的所有Channel完全启动"""
import asyncio
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < timeout:
# 检查所有Channel是否都已连接
client_channels = self._channel_manager.get_client_channels(client_name)
all_connected = True
for channel_name, channel in client_channels.items():
if not channel.is_connected:
all_connected = False
logger.debug(f"等待Channel连接: {channel_name}")
break
if all_connected:
logger.info(f"所有Channel已准备就绪: {client_name}")
return True
# 等待100ms后再次检查
await asyncio.sleep(0.1)
logger.warning(f"等待Channel就绪超时: {client_name} (超时时间: {timeout}秒)")
return False
async def _create_and_start_send_controller(self, client_name: str):
"""创建并启动发送控制器"""
try:
client = self._client_manager.get_client(client_name)
if not client:
logger.error(f"WebSocket客户端 {client_name} 不存在")
return
# 检查是否已存在发送控制器
if client_name in self._send_controllers:
existing_controller = self._send_controllers[client_name]
logger.info(f"WebSocket发送控制器 {client_name} 已存在,先停止")
await existing_controller.stop()
del self._send_controllers[client_name]
# 获取Channel
heartbeat_channel = self._channel_manager.get_channel(f"{client_name}_heartbeat")
send_channel = self._channel_manager.get_channel(f"{client_name}_send")
if not heartbeat_channel or not send_channel:
logger.error(f"Channel不存在,无法创建发送控制器: {client_name}")
return
# 创建发送控制器
send_controller = WebSocketSendController(
client=client,
heartbeat_channel=heartbeat_channel,
send_channel=send_channel
)
# 启动发送控制器
success = await send_controller.start()
if success:
self._send_controllers[client_name] = send_controller
logger.info(f"WebSocket管理器启动发送控制器成功: {client_name}")
else:
logger.error(f"WebSocket管理器启动发送控制器失败: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器创建发送控制器失败: {client_name} - {e}")
async def _async_register_device_events(self, client_name: str, device_manager):
"""异步注册设备事件推送(不阻塞API响应)"""
try:
logger.debug(f"开始异步注册设备事件推送: {client_name}")
# 注册设备事件推送
await device_manager.register_websocket_client(client_name)
logger.info(f"客户端 {client_name} 设备事件推送注册完成")
except Exception as e:
logger.error(f"异步注册设备事件推送到客户端 {client_name} 异常: {e}")
def _handle_async_task_result(self, task, task_name: str):
"""处理异步任务结果"""
try:
if task.cancelled():
logger.warning(f"异步任务被取消: {task_name}")
elif task.exception():
logger.error(f"异步任务异常: {task_name} - {task.exception()}")
else:
logger.debug(f"异步任务完成: {task_name}")
except Exception as e:
logger.error(f"处理异步任务结果异常: {task_name} - {e}")
async def _async_push_buffered_events(self, client_name: str, device_manager):
"""异步推送缓冲事件到客户端(不阻塞API响应)"""
try:
logger.debug(f"开始异步推送缓冲事件到客户端: {client_name}")
# 获取缓冲事件
buffered_events = await device_manager.get_buffered_events(client_name)
if not buffered_events:
logger.debug(f"客户端 {client_name} 没有缓冲事件需要推送")
return
logger.info(f"向客户端 {client_name} 异步推送 {len(buffered_events)} 个缓冲事件")
# 添加超时控制
timeout = 30 # 30秒超时
start_time = asyncio.get_event_loop().time()
success_count = 0
failed_count = 0
for event_index, event in enumerate(buffered_events, 1):
try:
# 检查超时
if asyncio.get_event_loop().time() - start_time > timeout:
logger.warning(f"客户端 {client_name} 缓冲事件推送超时,已推送 {event_index-1}/{len(buffered_events)} 个事件")
break
logger.debug(f"推送缓冲事件 {event_index}/{len(buffered_events)}: {event.get('type', 'unknown')} - {event.get('device_id', 'unknown')}")
# 检查客户端是否仍然连接
client = self._client_manager.get_client(client_name)
if not client or not client.is_connected:
logger.warning(f"客户端 {client_name} 已断开,停止推送缓冲事件")
break
success = await self.send_message(client_name, "terminal", event, priority=1)
if success:
success_count += 1
logger.debug(f"缓冲事件推送成功: {client_name} -> {event.get('device_id', 'unknown')} ({event.get('type', 'unknown')})")
else:
failed_count += 1
logger.warning(f"缓冲事件推送失败: {client_name} -> {event.get('device_id', 'unknown')} ({event.get('type', 'unknown')})")
# 短暂延迟,避免过快推送
await asyncio.sleep(0.01)
except Exception as e:
failed_count += 1
logger.error(f"推送缓冲事件到客户端 {client_name} 失败: {e}")
# 如果客户端出错,停止推送
break
success_rate = (success_count / len(buffered_events) * 100) if buffered_events else 0
duration = asyncio.get_event_loop().time() - start_time
logger.info(f"客户端 {client_name} 缓冲事件异步推送完成: {success_count}/{len(buffered_events)} 个事件成功 (成功率: {success_rate:.1f}%, 耗时: {duration:.2f}秒)")
if failed_count > 0:
logger.warning(f"客户端 {client_name} 缓冲事件推送存在失败: {failed_count} 个事件推送失败")
except Exception as e:
logger.error(f"异步推送缓冲事件到客户端 {client_name} 异常: {e}")
async def _create_and_start_receive_processor(self, client_name: str):
"""创建并启动接收消息处理器"""
try:
# 检查是否已存在接收处理器
if client_name in self._receive_tasks:
existing_task = self._receive_tasks[client_name]
logger.info(f"WebSocket接收消息处理器 {client_name} 已存在,先停止")
if not existing_task.done():
existing_task.cancel()
try:
await existing_task
except asyncio.CancelledError:
pass
del self._receive_tasks[client_name]
# 获取接收Channel
receive_channel = self._channel_manager.get_channel(f"{client_name}_receive")
if not receive_channel:
logger.error(f"接收Channel不存在: {client_name}_receive")
return
# 创建接收消息处理任务
receive_task = asyncio.create_task(self._receive_message_loop(client_name, receive_channel))
self._receive_tasks[client_name] = receive_task
logger.info(f"WebSocket管理器启动接收消息处理器成功: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器创建接收消息处理器失败: {client_name} - {e}")
async def _receive_message_loop(self, client_name: str, receive_channel):
"""接收消息处理循环
单一职责:只负责从接收Channel读取消息并进行业务处理
"""
logger.info(f"WebSocket管理器开始接收消息处理循环: {client_name}")
try:
while receive_channel.is_connected:
try:
# 从接收Channel获取消息
message = await receive_channel.receive_message()
if message:
# 处理接收到的消息
await self._process_received_message(client_name, message)
else:
# 没有消息时短暂等待
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info(f"WebSocket管理器接收消息处理循环被取消: {client_name}")
break
except Exception as e:
logger.error(f"WebSocket管理器接收消息处理循环异常: {client_name} - {e}")
await asyncio.sleep(1) # 异常时等待1秒再重试
logger.info(f"WebSocket管理器接收消息处理循环结束: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器接收消息处理循环严重异常: {client_name} - {e}")
async def _process_received_message(self, client_name: str, message: ChannelMessage):
"""处理接收到的消息
单一职责:只负责业务逻辑处理,不处理WebSocket通信
"""
try:
logger.info(f"WebSocket管理器处理接收消息: {client_name} -> type: {message.type}")
# 根据消息类型进行不同的业务处理
if message.type == "heartbeat":
await self._handle_heartbeat_response(client_name, message)
elif message.type == "data":
await self._handle_data_message(client_name, message)
elif message.type == "command":
await self._handle_command_message(client_name, message)
else:
logger.info(f"WebSocket管理器收到未知类型消息: {client_name} -> {message.type}")
except Exception as e:
logger.error(f"WebSocket管理器处理接收消息异常: {client_name} -> {message.type} - {e}")
async def _handle_heartbeat_response(self, client_name: str, message: ChannelMessage):
"""处理心跳响应消息"""
try:
logger.debug(f"WebSocket管理器处理心跳响应: {client_name}")
# 这里可以添加心跳响应的业务逻辑
# 例如:更新客户端状态、记录心跳时间等
except Exception as e:
logger.error(f"WebSocket管理器处理心跳响应异常: {client_name} - {e}")
async def _handle_data_message(self, client_name: str, message: ChannelMessage):
"""处理数据消息"""
try:
logger.debug(f"WebSocket管理器处理数据消息: {client_name} -> {message.data}")
# 这里可以添加数据消息的业务逻辑
# 例如:数据转发、数据存储、数据分析等
except Exception as e:
logger.error(f"WebSocket管理器处理数据消息异常: {client_name} - {e}")
async def _handle_command_message(self, client_name: str, message: ChannelMessage):
"""处理命令消息"""
try:
logger.debug(f"WebSocket管理器处理命令消息: {client_name} -> {message.data}")
# 这里可以添加命令消息的业务逻辑
# 例如:执行命令、状态更新、配置修改等
except Exception as e:
logger.error(f"WebSocket管理器处理命令消息异常: {client_name} - {e}")
async def disconnect_client(self, name: str) -> bool:
"""断开指定客户端"""
try:
# 1. 停止发送控制器(包含心跳任务)
await self._stop_send_controller(name)
# 3. 停止接收消息处理器
await self._stop_receive_processor(name)
# 4. 等待发送控制器完全停止(确保不再访问Channel)
await asyncio.sleep(0.1)
# 5. 停止Channel
await self._channel_manager.stop_client_channels(name)
# 6. 断开客户端
success = await self._client_manager.disconnect_client(name)
# 新增:注销设备事件推送
logger.debug(f"为客户端 {name} 注销设备事件推送")
device_manager = await self._get_device_manager()
await device_manager.unregister_websocket_client(name)
logger.info(f"WebSocket管理器客户端断开成功: {name}")
return success
except Exception as e:
logger.error(f"WebSocket管理器断开客户端失败: {name} - {e}")
return False
async def _stop_send_controller(self, client_name: str):
"""停止发送控制器"""
try:
if client_name in self._send_controllers:
send_controller = self._send_controllers[client_name]
await send_controller.stop()
del self._send_controllers[client_name]
logger.info(f"WebSocket管理器停止发送控制器: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器停止发送控制器失败: {client_name} - {e}")
async def _stop_receive_processor(self, client_name: str):
"""停止接收消息处理器"""
try:
if client_name in self._receive_tasks:
receive_task = self._receive_tasks[client_name]
if not receive_task.done():
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
del self._receive_tasks[client_name]
logger.info(f"WebSocket管理器停止接收消息处理器: {client_name}")
except Exception as e:
logger.error(f"WebSocket管理器停止接收消息处理器失败: {client_name} - {e}")
async def remove_client(self, name: str) -> bool:
"""移除指定客户端"""
try:
# 1. 停止发送控制器(包含心跳任务)
await self._stop_send_controller(name)
# 3. 停止接收消息处理器
await self._stop_receive_processor(name)
# 4. 移除Channel
await self._channel_manager.remove_client_channels(name)
# 5. 移除客户端
success = await self._client_manager.remove_client(name)
logger.info(f"WebSocket管理器客户端移除成功: {name}")
return success
except Exception as e:
logger.error(f"WebSocket管理器移除客户端失败: {name} - {e}")
return False
async def _add_service_serial_to_message(self, message_content: Any) -> Dict[str, Any]:
"""为消息内容添加 ServiceSerial 字段
Args:
message_content: 原始消息内容
Returns:
添加了 ServiceSerial 字段的消息内容字典
"""
# 构建消息数据格式,去除data中的type字段避免冗余
if isinstance(message_content, dict):
# 复制字典避免修改原始数据
result = message_content.copy()
if "type" in result:
result.pop("type")
else:
# 如果message_content不是字典,创建一个新的字典
result = {"data": message_content}
# 添加ServiceSerial字段
result["ServiceSerial"] = self._service_serial
return result
async def send_message(self, client_name: str, message_type: str, data: Any, priority: int = 0) -> bool:
"""发送消息到指定客户端"""
try:
# 心跳消息由send_controller内部自动生成,外部不应发送
if message_type == "heartbeat":
logger.warning(f"WebSocket管理器拒绝外部心跳消息: {client_name} - 心跳由内部自动生成")
return False
# 所有外部消息都发送到send_channel
channel_name = f"{client_name}_send"
# 添加ServiceSerial字段
message_content = await self._add_service_serial_to_message(data)
message_data = {
"payload": {
"message": message_content
}
}
# 发送到Channel
success = await self._channel_manager.send_message_to_channel(channel_name, message_type, message_data, priority)
return success
except Exception as e:
logger.error(f"WebSocket管理器发送消息异常: {client_name} -> {message_type} - {e}")
return False
def get_client(self, name: str) -> Optional[WebSocketClient]:
"""获取指定客户端"""
return self._client_manager.get_client(name)
def get_channel(self, name: str) -> Optional[WebSocketChannel]:
"""获取指定Channel"""
return self._channel_manager.get_channel(name)
def get_client_channels(self, client_name: str) -> Dict[str, WebSocketChannel]:
"""获取指定客户端的所有Channel"""
return self._channel_manager.get_client_channels(client_name)
def get_all_clients(self) -> Dict[str, WebSocketClient]:
"""获取所有客户端"""
return self._client_manager.get_all_clients()
def get_all_channels(self) -> Dict[str, WebSocketChannel]:
"""获取所有Channel"""
return self._channel_manager.get_all_channels()
async def cleanup(self):
"""清理所有资源"""
try:
# 清理发送控制器
for client_name in list(self._send_controllers.keys()):
await self._stop_send_controller(client_name)
# 清理接收消息处理器
for client_name in list(self._receive_tasks.keys()):
await self._stop_receive_processor(client_name)
# 清理Channel管理器
await self._channel_manager.cleanup()
# 清理客户端管理器
await self._client_manager.cleanup()
# 新增:清理设备事件推送
if self._device_manager:
await self._device_manager.cleanup()
logger.info("WebSocket管理器清理完成")
except Exception as e:
logger.error(f"WebSocket管理器清理失败: {e}")
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
stats = {
"created_at": self._created_at.isoformat(),
"client_stats": self._client_manager.get_stats(),
"channel_stats": self._channel_manager.get_stats(),
"send_controller_count": len(self._send_controllers),
"receive_processor_count": len(self._receive_tasks),
"send_controllers": {},
"receive_processors": {}
}
for client_name, controller in self._send_controllers.items():
stats["send_controllers"][client_name] = controller.get_stats()
for client_name, task in self._receive_tasks.items():
stats["receive_processors"][client_name] = {
"is_running": not task.done(),
"is_cancelled": task.cancelled(),
"exception": str(task.exception()) if task.exception() else None
}
return stats
# 全局WebSocket管理器实例
websocket_manager = WebSocketManager()