Browse Source

111

origin/hotfix/hlk-flight
root 4 months ago
parent
commit
1410a11093
  1. 61
      app/core/websocket/manager.py

61
app/core/websocket/manager.py

@ -87,7 +87,6 @@ class WebSocketManager:
if channel_name not in self._channels: if channel_name not in self._channels:
channel = WebSocketChannel(channel_name, max_size) channel = WebSocketChannel(channel_name, max_size)
self._channels[channel_name] = channel self._channels[channel_name] = channel
await channel.connect()
logger.info(f"WebSocket管理器创建Channel: {channel_name}") logger.info(f"WebSocket管理器创建Channel: {channel_name}")
# 创建适配器 # 创建适配器
@ -124,13 +123,34 @@ class WebSocketManager:
if adapter_key not in self._adapters: if adapter_key not in self._adapters:
adapter = WebSocketAdapter(client, outbound_channel, inbound_channel) adapter = WebSocketAdapter(client, outbound_channel, inbound_channel)
self._adapters[adapter_key] = adapter self._adapters[adapter_key] = adapter
await adapter.start()
logger.info(f"WebSocket管理器创建适配器: {adapter_key}") logger.info(f"WebSocket管理器创建适配器: {adapter_key}")
except Exception as e: except Exception as e:
logger.error(f"WebSocket管理器创建客户端适配器失败: {client_name} - {e}") logger.error(f"WebSocket管理器创建客户端适配器失败: {client_name} - {e}")
raise raise
async def _start_client_channels(self, client_name: str):
"""启动客户端的所有Channel和适配器
单一职责只负责启动Channel和适配器不处理业务逻辑
"""
try:
# 启动客户端的所有Channel
client_channels = self.get_client_channels(client_name)
for channel_name, channel in client_channels.items():
await channel.connect()
logger.info(f"WebSocket管理器启动Channel: {channel_name}")
# 启动客户端的所有适配器
client_adapters = self.get_client_adapters(client_name)
for adapter_key, adapter in client_adapters.items():
await adapter.start()
logger.info(f"WebSocket管理器启动适配器: {adapter_key}")
except Exception as e:
logger.error(f"WebSocket管理器启动客户端Channel失败: {client_name} - {e}")
raise
async def _start_heartbeat_task(self, client_name: str, heartbeat_interval: int): async def _start_heartbeat_task(self, client_name: str, heartbeat_interval: int):
"""启动心跳任务 """启动心跳任务
@ -283,18 +303,31 @@ class WebSocketManager:
logger.error(f"WebSocket客户端 {name} 不存在") logger.error(f"WebSocket客户端 {name} 不存在")
return False return False
# 连接客户端 try:
success = await client.connect() # 先连接WebSocket客户端
success = await client.connect()
# 如果连接成功,启动心跳任务
if success and hasattr(client, 'heartbeat_interval'): if success:
try: # 连接成功后再启动客户端的所有Channel和适配器
await self._start_heartbeat_task(name, client.heartbeat_interval) await self._start_client_channels(name)
logger.info(f"WebSocket管理器连接成功后启动心跳任务: {name}")
except Exception as e: # 最后启动心跳任务
logger.error(f"心跳任务启动失败: {name} - {e}") if hasattr(client, 'heartbeat_interval'):
try:
return success await self._start_heartbeat_task(name, client.heartbeat_interval)
logger.info(f"WebSocket管理器连接成功后启动心跳任务: {name}")
except Exception as e:
logger.error(f"心跳任务启动失败: {name} - {e}")
logger.info(f"WebSocket管理器客户端连接成功: {name}")
else:
logger.error(f"WebSocket客户端连接失败: {name}")
return success
except Exception as e:
logger.error(f"WebSocket管理器连接客户端失败: {name} - {e}")
return False
async def disconnect_client(self, name: str) -> bool: async def disconnect_client(self, name: str) -> bool:
"""断开指定客户端""" """断开指定客户端"""

Loading…
Cancel
Save