From 4aa112a949c44d1871880bd285c802f46e24bf46 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 12 Aug 2025 19:29:41 +0800 Subject: [PATCH] =?UTF-8?q?json=20=E5=BA=8F=E5=88=97=E5=8C=96=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/websocket/adapter.py | 9 +++-- app/core/websocket/serializer.py | 60 ++++++++++++++++++++++++++++++++ modify.md | 17 +++++++-- 3 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 app/core/websocket/serializer.py diff --git a/app/core/websocket/adapter.py b/app/core/websocket/adapter.py index 2ced66b..63dfd66 100644 --- a/app/core/websocket/adapter.py +++ b/app/core/websocket/adapter.py @@ -9,6 +9,7 @@ import json from datetime import datetime from app.core.websocket.client import WebSocketClient from app.core.websocket.channel import WebSocketChannel, ChannelMessage +from app.core.websocket.serializer import websocket_serializer from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.INFO) @@ -83,6 +84,7 @@ class WebSocketAdapter: """ logger.info(f"发送循环启动: {self.client.name} (out:{self.outbound_channel.name} / in:{self.inbound_channel.name})") logger.info(f"连接状态检查: client.is_connected={self.client.is_connected}, outbound_channel.is_connected={self.outbound_channel.is_connected}") + logger.info(f"适配器类型: {'心跳适配器' if 'heartbeat' in self.outbound_channel.name else '接收适配器' if 'receive' in self.outbound_channel.name else '发送适配器'}") try: while self.client.is_connected and self.outbound_channel.is_connected: try: @@ -92,7 +94,6 @@ class WebSocketAdapter: if msg: logger.info(f"适配器收到消息: {self.outbound_channel.name} -> type: {msg.type}, data: {msg.data}") # 将消息数据序列化为JSON字符串 - import json if msg.data is None: # 如果data为None,跳过发送 logger.debug(f"适配器跳过空数据消息: {self.outbound_channel.name} -> {msg.type}") @@ -102,7 +103,8 @@ class WebSocketAdapter: "Type": msg.type, **msg.data } - payload = json.dumps(send_data) + # 使用序列化器序列化消息 + payload = websocket_serializer.serialize(send_data) # 发送数据到WebSocket success = await self.client.send_raw(payload) if success: @@ -148,6 +150,9 @@ class WebSocketAdapter: priority=message.get("priority", 0) # 支持优先级 ) + # 添加调试日志 + logger.info(f"适配器处理WebSocket消息: {self.client.name} -> type: {message.get('type')}, data: {message.get('data')}") + # 插入到Channel success = await self.inbound_channel.send_message(channel_message) diff --git a/app/core/websocket/serializer.py b/app/core/websocket/serializer.py new file mode 100644 index 0000000..e9d5df5 --- /dev/null +++ b/app/core/websocket/serializer.py @@ -0,0 +1,60 @@ +""" +WebSocket消息序列化器 +类似于.NET的序列化器,负责消息的序列化和反序列化 +""" +import json +from typing import Any, Dict +from app.utils.structured_log import get_structured_logger, LogLevel + +logger = get_structured_logger(__name__, LogLevel.INFO) + +class WebSocketMessageSerializer: + """WebSocket消息序列化器 + + 单一职责: + - 序列化消息为JSON字符串 + - 反序列化JSON字符串为消息对象 + - 处理消息格式转换 + """ + + def __init__(self): + logger.info("WebSocket消息序列化器初始化完成") + + def serialize(self, message: Dict[str, Any]) -> str: + """序列化消息为JSON字符串 + + 类似于.NET的 _serializer.Serialize(processedMessage) + """ + try: + # 转换字段名为小写,适配.NET序列化器 + converted_message = self._convert_to_lowercase(message) + # 使用JSON序列化 + serialized_data = json.dumps(converted_message, ensure_ascii=False) + logger.debug(f"消息序列化成功: {serialized_data}") + return serialized_data + except Exception as e: + logger.error(f"消息序列化失败: {e}") + raise + + def _convert_to_lowercase(self, obj: Any) -> Any: + """递归转换字典中的字段名为小写""" + if isinstance(obj, dict): + return {key.lower(): self._convert_to_lowercase(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [self._convert_to_lowercase(item) for item in obj] + else: + return obj + + def deserialize(self, data: str) -> Dict[str, Any]: + """反序列化JSON字符串为消息对象""" + try: + # 使用JSON反序列化 + message = json.loads(data) + logger.debug(f"消息反序列化成功: {message}") + return message + except Exception as e: + logger.error(f"消息反序列化失败: {e}") + raise + +# 创建全局序列化器实例 +websocket_serializer = WebSocketMessageSerializer() \ No newline at end of file diff --git a/modify.md b/modify.md index bdd7b75..dadca79 100644 --- a/modify.md +++ b/modify.md @@ -899,6 +899,9 @@ else: - ✅ 调试信息更清晰 - ✅ **添加调试日志,便于排查消息接收问题** - ✅ **添加详细的消息内容日志,便于调试心跳消息流转** +- ✅ **添加连接状态检查日志,便于调试连接问题** +- ✅ **添加WebSocket消息处理日志,便于调试错误消息来源** +- ✅ **添加适配器类型识别日志,便于区分不同适配器的工作状态** ### Channel生命周期管理完善 **问题**:用户指出"没有遵循单一原则是 channel.py他stop 之后能使用吗 ,二次连接webscoket 还能有吗 生命周期" @@ -1853,10 +1856,12 @@ if msg: 2. 使用标准的消息结构:`{"Type": "heartbeat", "Payload": {"Message": "ping"}}` 3. **消除冗余**:移除不必要的中间变量 4. **智能类型处理**:ChannelMessage.type在发送时自动添加到data中 +5. **自动字段名转换**:序列化器自动将字段名转换为小写,适配.NET JSON序列化器 **文件变更**: - 更新 `app/core/websocket/manager.py` - 修复心跳消息格式并消除冗余 - 更新 `app/core/websocket/adapter.py` - 在发送时自动添加Type字段 +- 创建 `app/core/websocket/serializer.py` - WebSocket消息序列化器 **修改内容**: ```python @@ -1876,6 +1881,10 @@ send_data = { "Type": msg.type, **msg.data } + +# 使用序列化器序列化消息(自动转换为小写) +from app.core.websocket.serializer import websocket_serializer +payload = websocket_serializer.serialize(send_data) ``` **优化效果**: @@ -1887,13 +1896,15 @@ send_data = { - ✅ 代码结构清晰,导入优化 - ✅ **消除冗余,代码更简洁** - ✅ **智能类型处理,避免字段重复** +- ✅ **自动字段名转换**:序列化器自动将字段名转换为小写 +- ✅ **专用序列化器**:类似于.NET的 `_serializer.Serialize(processedMessage)` **消息格式**: ```json { - "Type": "heartbeat", - "Payload": { - "Message": "ping" + "type": "heartbeat", + "payload": { + "message": "ping" } } ``` \ No newline at end of file