You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

305 lines
14 KiB

"""
WebSocket发送控制器
"""
import asyncio
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.core.websocket.channel import ChannelMessage, WebSocketChannel
from app.core.websocket.client import WebSocketClient
from app.utils.structured_log import get_structured_logger
logger = get_structured_logger(__name__)
class WebSocketSendController:
"""WebSocket发送控制器
单一职责:只负责统一的消息发送逻辑和心跳管理
- 统一发送循环
- 消息发送到WebSocket
- 优先级控制
- 发送任务管理
- 心跳任务管理
- 心跳消息生成
"""
def __init__(self, client: WebSocketClient, heartbeat_channel: WebSocketChannel, send_channel: WebSocketChannel):
self.client = client
self.client_name = client.name
self.heartbeat_channel = heartbeat_channel
self.send_channel = send_channel
self._send_task: Optional[asyncio.Task] = None
self._heartbeat_task: Optional[asyncio.Task] = None
self._running = False
self._heartbeat_interval = client.heartbeat_interval if hasattr(client, 'heartbeat_interval') else 120
self._created_at = datetime.now()
logger.info(f"WebSocket发送控制器初始化: {self.client_name} (心跳间隔: {self._heartbeat_interval}s)")
async def start(self) -> bool:
"""启动发送控制器"""
if self._running:
logger.warning(f"WebSocket发送控制器 {self.client_name} 已在运行")
return True
try:
self._running = True
self._send_task = asyncio.create_task(self._unified_send_loop())
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info(f"WebSocket发送控制器启动成功: {self.client_name}")
return True
except Exception as e:
self._running = False
logger.error(f"WebSocket发送控制器启动失败: {self.client_name} - {e}")
return False
async def stop(self) -> bool:
"""停止发送控制器"""
if not self._running:
logger.warning(f"WebSocket发送控制器 {self.client_name} 未在运行")
return True
try:
self._running = False
# 停止发送任务
if self._send_task and not self._send_task.done():
self._send_task.cancel()
try:
await self._send_task
except asyncio.CancelledError:
pass
# 停止心跳任务
if self._heartbeat_task and not self._heartbeat_task.done():
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
logger.info(f"WebSocket发送控制器停止成功: {self.client_name}")
return True
except Exception as e:
logger.error(f"WebSocket发送控制器停止失败: {self.client_name} - {e}")
return False
async def _heartbeat_loop(self):
"""心跳循环
单一职责:只负责定期生成心跳消息并放入Channel
"""
logger.info(f"WebSocket发送控制器开始心跳循环: {self.client_name} (间隔: {self._heartbeat_interval}s)")
try:
while self._running:
try:
# 检查是否需要生成心跳消息
should_generate_heartbeat = await self._should_generate_heartbeat()
if should_generate_heartbeat:
# 生成心跳消息并放入Channel
success = await self._generate_heartbeat_message()
if success:
logger.debug(f"WebSocket发送控制器生成心跳消息成功: {self.client_name}")
else:
logger.warning(f"WebSocket发送控制器生成心跳消息失败: {self.client_name}")
else:
logger.debug(f"WebSocket发送控制器跳过心跳生成: {self.client_name} (send_channel有数据)")
# 等待下次心跳
await asyncio.sleep(self._heartbeat_interval)
except asyncio.CancelledError:
logger.info(f"WebSocket发送控制器心跳循环被取消: {self.client_name}")
break
except Exception as e:
logger.error(f"WebSocket发送控制器心跳循环异常: {self.client_name} - {e}")
await asyncio.sleep(self._heartbeat_interval) # 异常时继续等待
logger.info(f"WebSocket发送控制器心跳循环结束: {self.client_name}")
except Exception as e:
logger.error(f"WebSocket发送控制器心跳循环严重异常: {self.client_name} - {e}")
async def _should_generate_heartbeat(self) -> bool:
"""检查是否应该生成心跳消息
单一职责:只负责判断是否应该生成心跳消息
"""
try:
# 检查send_channel是否有数据
if self.send_channel and self.send_channel.is_connected:
send_queue_size = self.send_channel.total_queue_size
if send_queue_size > 0:
logger.debug(f"跳过心跳生成: {self.client_name} send_channel 有数据 (队列大小: {send_queue_size})")
return False
return True
except Exception as e:
logger.error(f"WebSocket发送控制器检查心跳生成条件异常: {self.client_name} - {e}")
return True # 异常时默认生成心跳
async def _generate_heartbeat_message(self) -> bool:
"""生成心跳消息并放入Channel
单一职责:只负责生成心跳消息并放入Channel
"""
try:
if not self.heartbeat_channel or not self.heartbeat_channel.is_connected:
logger.warning(f"心跳Channel不存在或未连接,无法生成心跳消息: {self.client_name}")
return False
# 创建心跳消息
heartbeat_message = ChannelMessage(
type="heartbeat",
data={
"Payload": {
"Message": "ping"
}
},
priority=1 # 高优先级,确保优先处理
)
# 将心跳消息放入Channel
success = await self.heartbeat_channel.send_message(heartbeat_message)
if success:
# 添加详细的队列状态日志
heartbeat_stats = self.heartbeat_channel.get_stats()
logger.debug(f"WebSocket发送控制器心跳消息生成并放入Channel成功: {self.client_name} (队列状态: {heartbeat_stats['queue_size']}普通/{heartbeat_stats['priority_queue_size']}优先级)")
else:
logger.warning(f"WebSocket发送控制器心跳消息生成并放入Channel失败: {self.client_name}")
return success
except Exception as e:
logger.error(f"WebSocket发送控制器生成心跳消息异常: {self.client_name} - {e}")
return False
async def _unified_send_loop(self):
"""统一发送循环
单一职责:只负责从Channel获取消息并发送到WebSocket
"""
logger.info(f"WebSocket发送控制器开始统一发送循环: {self.client_name}")
try:
while self._running and self.client.is_connected:
try:
# 检查Channel是否存在且已连接
if not self.heartbeat_channel or not self.send_channel:
logger.warning(f"Channel不存在,等待重试: {self.client_name}")
await asyncio.sleep(1)
continue
if not self.heartbeat_channel.is_connected or not self.send_channel.is_connected:
logger.warning(f"Channel未连接,等待重试: {self.client_name}")
await asyncio.sleep(1)
continue
# 优先级发送:先发送业务数据,再发送心跳
message_sent = False
# 1. 优先发送业务数据
if self.send_channel.total_queue_size > 0: # 修改这里,使用total_queue_size而不是queue_size
try:
message = await self.send_channel.receive_message()
if message:
logger.debug(f"WebSocket发送控制器接收到业务消息: {self.client_name} -> {message.type} (优先级:{message.priority})")
success = await self._send_message(message)
if success:
logger.debug(f"WebSocket发送控制器发送业务消息成功: {self.client_name} -> {message.type}")
message_sent = True
else:
logger.warning(f"WebSocket发送控制器发送业务消息失败: {self.client_name} -> {message.type}")
except Exception as e:
logger.error(f"WebSocket发送控制器发送业务消息异常: {self.client_name} - {e}")
# 2. 如果没有业务数据,发送心跳
# 修复:检查总队列大小,包括优先级队列
heartbeat_queue_size = self.heartbeat_channel.total_queue_size
if not message_sent and heartbeat_queue_size > 0:
logger.debug(f"WebSocket发送控制器准备发送心跳消息: {self.client_name} (队列大小: {heartbeat_queue_size})")
try:
message = await self.heartbeat_channel.receive_message()
if message:
success = await self._send_message(message)
if success:
logger.debug(f"WebSocket发送控制器发送心跳消息成功: {self.client_name} -> {message.type}")
message_sent = True
else:
logger.warning(f"WebSocket发送控制器发送心跳消息失败: {self.client_name} -> {message.type}")
except Exception as e:
logger.error(f"WebSocket发送控制器发送心跳消息异常: {self.client_name} - {e}")
# 3. 如果没有消息需要发送,短暂等待
if not message_sent:
await asyncio.sleep(0.1) # 100ms
except asyncio.CancelledError:
logger.info(f"WebSocket发送控制器发送循环被取消: {self.client_name}")
break
except Exception as e:
logger.error(f"WebSocket发送控制器发送循环异常: {self.client_name} - {e}")
await asyncio.sleep(1) # 异常时等待1秒再重试
logger.info(f"WebSocket发送控制器统一发送循环结束: {self.client_name}")
except Exception as e:
logger.error(f"WebSocket发送控制器发送循环严重异常: {self.client_name} - {e}")
async def _send_message(self, message: ChannelMessage) -> bool:
"""发送消息到WebSocket
单一职责:只负责将消息发送到WebSocket客户端
"""
try:
if not self.client.is_connected:
logger.warning(f"WebSocket客户端未连接,无法发送消息: {self.client_name}")
return False
# 在发送时添加Type字段
send_data = {
"Type": message.type,
**message.data
}
# 使用序列化器序列化消息
from app.core.websocket.serializer import websocket_serializer
payload = websocket_serializer.serialize(send_data)
# 发送数据到WebSocket
success = await self.client.send_raw(payload)
if success:
logger.debug(f"WebSocket发送控制器消息发送成功: {self.client_name} -> {message.type}")
else:
logger.warning(f"WebSocket发送控制器消息发送失败: {self.client_name} -> {message.type}")
return success
except Exception as e:
logger.error(f"WebSocket发送控制器发送消息异常: {self.client_name} -> {message.type} - {e}")
return False
def is_running(self) -> bool:
"""检查发送控制器是否在运行"""
return self._running
def get_stats(self) -> Dict[str, Any]:
"""获取发送控制器统计信息"""
return {
"client_name": self.client_name,
"is_running": self._running,
"is_connected": self.client.is_connected,
"created_at": self._created_at.isoformat(),
"heartbeat_interval": self._heartbeat_interval,
"heartbeat_channel": self.heartbeat_channel.name,
"send_channel": self.send_channel.name,
"send_task_running": self._send_task and not self._send_task.done() if self._send_task else False,
"heartbeat_task_running": self._heartbeat_task and not self._heartbeat_task.done() if self._heartbeat_task else False
}