Browse Source

标准数据

origin/hotfix/hlk-flight
root 4 months ago
parent
commit
883c8853c6
  1. 22
      app/core/websocket/adapter.py
  2. 15
      app/core/websocket/manager.py
  3. 180
      modify.md
  4. 181
      test_heartbeat_simple.py

22
app/core/websocket/adapter.py

@ -88,12 +88,24 @@ class WebSocketAdapter:
# 从Channel接收消息
msg = await self.outbound_channel.receive_message(timeout=0.5)
if msg:
# 发送数据到WebSocket
success = await self.client.send_raw(msg.data)
if success:
logger.debug(f"适配器发送消息成功: {self.outbound_channel.name} -> {msg.type}")
# 将消息数据序列化为JSON字符串
import json
if isinstance(msg.data, dict):
# 在发送时添加type字段
send_data = {
"Type": msg.type,
**msg.data
}
payload = json.dumps(send_data)
# 发送数据到WebSocket
success = await self.client.send_raw(payload)
if success:
logger.debug(f"适配器发送消息成功: {self.outbound_channel.name} -> {msg.type}")
else:
logger.warning(f"适配器发送消息失败: {self.outbound_channel.name} -> {msg.type}")
else:
logger.warning(f"适配器发送消息失败: {self.outbound_channel.name} -> {msg.type}")
# 如果不是字典格式,记录日志但不发送
logger.warning(f"适配器跳过非字典格式消息: {self.outbound_channel.name} -> {msg.type}, 数据类型: {type(msg.data)}")
else:
# 没有消息时短暂等待
await asyncio.sleep(0.05)

15
app/core/websocket/manager.py

@ -7,7 +7,7 @@ import asyncio
from typing import Dict, List, Optional, Any
from datetime import datetime
from app.core.websocket.client import WebSocketClient
from app.core.websocket.channel import WebSocketChannel
from app.core.websocket.channel import WebSocketChannel, ChannelMessage
from app.core.websocket.adapter import WebSocketAdapter
from app.utils.structured_log import get_structured_logger, LogLevel
@ -193,13 +193,14 @@ class WebSocketManager:
client = self._clients[client_name]
# 创建心跳消息
from app.core.websocket.channel import ChannelMessage
heartbeat_data = {"Message": "ping"}
heartbeat_message = ChannelMessage(
type="heartbeat",
data=heartbeat_data,
priority=1 # 高优先级
type="heartbeat", # 这个type会在发送时添加到data中
data={
"Payload": {
"Message": "ping"
}
},
priority=1 # 高优先级,确保优先处理
)
# 发送到心跳Channel

180
modify.md

@ -1686,3 +1686,183 @@ client = await websocket_manager.create_client("test_1", "ws://localhost:8080",
- 心跳数据格式保持为 `{"Message": "ping"}`,与原有格式一致
- 心跳循环现在会在客户端存在时持续运行,无论连接状态如何
- 这样可以确保心跳机制在客户端创建后立即开始工作
### WebSocket Channel启动时机修复
**问题**:`create_client` 方法中 `_create_client_channels` 不应该在创建时就启动Channel,应该在 `connect_client` 时才启动
**解决方案**:
1. 修改 `_create_client_channels` 方法,移除 `await channel.connect()` 调用
2. 修改 `_create_client_adapters` 方法,移除 `await adapter.start()` 调用
3. 在 `connect_client` 方法中添加 `_start_client_channels` 调用
4. 新增 `_start_client_channels` 方法,负责启动客户端的所有Channel和适配器
5. **修复启动顺序**:先连接WebSocket客户端,成功后再启动Channel和适配器
**文件变更**:
- 更新 `app/core/websocket/manager.py` - 修复Channel启动时机和启动顺序
**修改内容**:
1. **create_client方法优化**
```python
async def create_client(self, name: str, url: str, heartbeat_interval: int = 120) -> WebSocketClient:
# ... 创建客户端和Channel ...
# 创建3个Channel(不启动)
await self._create_client_channels(name)
# 保存心跳间隔配置,在连接成功后启动
client.heartbeat_interval = heartbeat_interval
logger.info(f"WebSocket管理器创建客户端: {name} -> {url}")
return client
```
2. **connect_client方法增强**(修复启动顺序):
```python
async def connect_client(self, name: str) -> bool:
# 先连接WebSocket客户端
success = await client.connect()
if success:
# 连接成功后再启动客户端的所有Channel和适配器
await self._start_client_channels(name)
# 最后启动心跳任务
if hasattr(client, 'heartbeat_interval'):
await self._start_heartbeat_task(name, client.heartbeat_interval)
logger.info(f"WebSocket管理器客户端连接成功: {name}")
else:
logger.error(f"WebSocket客户端连接失败: {name}")
return success
```
3. **新增_start_client_channels方法**
```python
async def _start_client_channels(self, client_name: str):
"""启动客户端的所有Channel和适配器"""
# 启动客户端的所有Channel
client_channels = self.get_client_channels(client_name)
for channel_name, channel in client_channels.items():
await channel.connect()
logger.info(f"WebSocket管理器启动Channel: {channel_name}")
# 启动客户端的所有适配器
client_adapters = self.get_client_adapters(client_name)
for adapter_key, adapter in client_adapters.items():
await adapter.start()
logger.info(f"WebSocket管理器启动适配器: {adapter_key}")
```
**优化效果**:
- ✅ Channel和适配器在连接时才启动,避免资源浪费
- ✅ 创建客户端时只创建对象,不启动服务
- ✅ **正确的启动顺序**:先连接WebSocket,再启动Channel和适配器
- ✅ 连接成功后才启动相关服务,避免发送失败
- ✅ 更好的资源管理和错误处理
- ✅ 清晰的职责分离:创建 vs 启动
**设计原则**:
- 创建时只创建对象,不启动服务
- 连接成功后才启动所有相关服务
- 按需启动,避免资源浪费
- 保持清晰的职责分离
- **正确的启动顺序**:WebSocket连接 → Channel启动 → 适配器启动 → 心跳任务启动
### WebSocket数据序列化修复
**问题**:适配器发送消息时出现 "data is a dict-like object" 错误,因为直接传递字典对象给WebSocket客户端
**解决方案**:
1. 在适配器的 `_send_loop` 方法中添加数据序列化逻辑
2. 将字典数据序列化为JSON字符串再发送
3. 确保所有数据类型都能正确序列化
**文件变更**:
- 更新 `app/core/websocket/adapter.py` - 修复数据序列化问题
**修改内容**:
```python
# 从Channel接收消息
msg = await self.outbound_channel.receive_message(timeout=0.5)
if msg:
# 将消息数据序列化为JSON字符串
import json
if isinstance(msg.data, dict):
payload = json.dumps(msg.data)
# 发送数据到WebSocket
success = await self.client.send_raw(payload)
if success:
logger.debug(f"适配器发送消息成功: {self.outbound_channel.name} -> {msg.type}")
else:
logger.warning(f"适配器发送消息失败: {self.outbound_channel.name} -> {msg.type}")
else:
# 如果不是字典格式,记录日志但不发送
logger.warning(f"适配器跳过非字典格式消息: {self.outbound_channel.name} -> {msg.type}, 数据类型: {type(msg.data)}")
```
**优化效果**:
- ✅ 修复了字典数据序列化问题
- ✅ 心跳消息 `{"Message": "ping"}` 能正确发送
- ✅ **只发送字典格式的数据,确保JSON字符串格式**
- ✅ 非字典格式数据会被跳过,只记录日志
- ✅ 适配器发送消息不再失败
- ✅ 心跳功能正常工作
**数据流**:
```
心跳任务 → 心跳Channel → 适配器 → JSON序列化 → WebSocket客户端 → WebSocket服务器
```
### WebSocket心跳消息格式修复
**问题**:心跳消息格式不符合.NET模型要求,需要包含 `Type``Payload` 字段
**解决方案**:
1. 修改心跳消息格式,符合.NET模型规范
2. 使用标准的消息结构:`{"Type": "heartbeat", "Payload": {"Message": "ping"}}`
3. **消除冗余**:移除不必要的中间变量
4. **智能类型处理**:ChannelMessage.type在发送时自动添加到data中
**文件变更**:
- 更新 `app/core/websocket/manager.py` - 修复心跳消息格式并消除冗余
- 更新 `app/core/websocket/adapter.py` - 在发送时自动添加Type字段
**修改内容**:
```python
# 创建心跳消息
heartbeat_message = ChannelMessage(
type="heartbeat", # 这个type会在发送时添加到data中
data={
"Payload": {
"Message": "ping"
}
},
priority=1 # 高优先级,确保优先处理
)
# 适配器发送时会自动添加Type字段
send_data = {
"Type": msg.type,
**msg.data
}
```
**优化效果**:
- ✅ 心跳消息格式符合.NET模型规范
- ✅ 包含标准的 `Type``Payload` 字段
- ✅ 消息结构清晰,便于服务器解析
- ✅ 保持与现有架构的兼容性
- ✅ **优先级机制确保心跳消息优先处理**
- ✅ 代码结构清晰,导入优化
- ✅ **消除冗余,代码更简洁**
- ✅ **智能类型处理,避免字段重复**
**消息格式**:
```json
{
"Type": "heartbeat",
"Payload": {
"Message": "ping"
}
}
```

181
test_heartbeat_simple.py

@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
简单的心跳功能测试
验证心跳任务是否正确启动并发送心跳消息
"""
import asyncio
import sys
import os
from datetime import datetime
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.core.websocket.manager import websocket_manager
from app.utils.structured_log import get_structured_logger, LogLevel
logger = get_structured_logger(__name__, LogLevel.DEBUG)
async def test_heartbeat_task_creation():
"""测试心跳任务创建逻辑(不依赖实际连接)"""
print("=== 心跳任务创建测试 ===")
try:
# 创建客户端
client_name = "test_task_creation"
client_url = "wss://localhost:8080"
heartbeat_interval = 5 # 5秒心跳间隔,便于测试
print(f"创建客户端: {client_name}")
print(f"心跳间隔: {heartbeat_interval}")
# 创建客户端
client = await websocket_manager.create_client(client_name, client_url, heartbeat_interval)
if not client:
print("❌ 客户端创建失败")
return False
print(f"✅ 客户端创建成功: {client.name}")
print(f" 心跳间隔配置: {client.heartbeat_interval}")
# 检查心跳Channel是否存在
heartbeat_channel_name = f"{client_name}_heartbeat"
heartbeat_channel = websocket_manager.get_channel(heartbeat_channel_name)
if heartbeat_channel:
print(f"✅ 心跳Channel存在: {heartbeat_channel.name}")
else:
print(f"❌ 心跳Channel不存在: {heartbeat_channel_name}")
return False
# 手动启动心跳任务(模拟连接成功)
print("手动启动心跳任务...")
await websocket_manager._start_heartbeat_task(client_name, heartbeat_interval)
# 检查心跳任务
if client_name in websocket_manager._heartbeat_tasks:
heartbeat_task = websocket_manager._heartbeat_tasks[client_name]
print(f"✅ 心跳任务存在: {heartbeat_task}")
print(f" 任务状态: {heartbeat_task.done()}")
print(f" 任务取消状态: {heartbeat_task.cancelled()}")
else:
print("❌ 心跳任务不存在")
return False
# 等待心跳发送
print(f"等待 {heartbeat_interval + 2} 秒观察心跳消息...")
await asyncio.sleep(heartbeat_interval + 2)
# 检查心跳任务状态
if client_name in websocket_manager._heartbeat_tasks:
heartbeat_task = websocket_manager._heartbeat_tasks[client_name]
if not heartbeat_task.done():
print("✅ 心跳任务仍在运行")
else:
print("❌ 心跳任务已结束")
if heartbeat_task.exception():
print(f" 异常: {heartbeat_task.exception()}")
else:
print("❌ 心跳任务不存在")
# 清理资源
print("清理资源...")
await websocket_manager.remove_client(client_name)
print("✅ 资源清理完成")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
async def test_heartbeat_with_mock_connection():
"""测试带模拟连接的心跳功能"""
print("\n=== 模拟连接心跳测试 ===")
try:
# 创建客户端
client_name = "test_mock_connection"
client_url = "wss://localhost:8080"
heartbeat_interval = 3 # 3秒心跳间隔,便于测试
print(f"创建客户端: {client_name}")
print(f"心跳间隔: {heartbeat_interval}")
# 创建客户端
client = await websocket_manager.create_client(client_name, client_url, heartbeat_interval)
if not client:
print("❌ 客户端创建失败")
return False
print(f"✅ 客户端创建成功: {client.name}")
# 模拟连接成功(直接设置连接状态)
client._state = client._state.__class__.CONNECTED
print("✅ 模拟连接成功")
# 手动启动心跳任务
print("启动心跳任务...")
await websocket_manager._start_heartbeat_task(client_name, heartbeat_interval)
# 检查心跳任务
if client_name in websocket_manager._heartbeat_tasks:
heartbeat_task = websocket_manager._heartbeat_tasks[client_name]
print(f"✅ 心跳任务存在: {heartbeat_task}")
else:
print("❌ 心跳任务不存在")
return False
# 等待心跳发送
print(f"等待 {heartbeat_interval + 2} 秒观察心跳消息...")
await asyncio.sleep(heartbeat_interval + 2)
# 检查心跳任务状态
if client_name in websocket_manager._heartbeat_tasks:
heartbeat_task = websocket_manager._heartbeat_tasks[client_name]
if not heartbeat_task.done():
print("✅ 心跳任务仍在运行")
else:
print("❌ 心跳任务已结束")
if heartbeat_task.exception():
print(f" 异常: {heartbeat_task.exception()}")
else:
print("❌ 心跳任务不存在")
# 清理资源
print("清理资源...")
await websocket_manager.remove_client(client_name)
print("✅ 资源清理完成")
return True
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""主测试函数"""
print("开始心跳功能测试")
print("=" * 50)
# 测试1:心跳任务创建
success1 = await test_heartbeat_task_creation()
# 测试2:模拟连接心跳
success2 = await test_heartbeat_with_mock_connection()
print("\n" + "=" * 50)
print("测试结果汇总:")
print(f"心跳任务创建测试: {'✅ 通过' if success1 else '❌ 失败'}")
print(f"模拟连接心跳测试: {'✅ 通过' if success2 else '❌ 失败'}")
if success1 and success2:
print("🎉 所有测试通过!")
else:
print("⚠️ 部分测试失败,请检查日志")
if __name__ == "__main__":
asyncio.run(main())
Loading…
Cancel
Save