#!/usr/bin/env python3 """ 简单的WebSocket测试服务器 用于配合WebSocket功能测试 """ import asyncio import websockets import json from datetime import datetime from app.utils.structured_log import get_structured_logger, LogLevel logger = get_structured_logger(__name__, LogLevel.DEBUG) class SimpleWebSocketServer: """简单的WebSocket测试服务器""" def __init__(self, host="localhost", port=8080): self.host = host self.port = port self.clients = set() self.message_count = 0 async def handle_client(self, websocket, path): """处理客户端连接""" client_id = f"client_{len(self.clients) + 1}" self.clients.add(websocket) logger.info(f"客户端连接: {client_id} - {websocket.remote_address}") try: async for message in websocket: self.message_count += 1 logger.info(f"收到消息 [{client_id}]: {message}") # 解析消息 try: data = json.loads(message) message_type = data.get("type", "unknown") # 根据消息类型处理 if message_type == "heartbeat": # 心跳响应 response = { "type": "heartbeat_response", "data": { "status": "pong", "timestamp": datetime.now().isoformat() } } elif message_type == "data": # 数据消息响应 response = { "type": "data_response", "data": { "status": "received", "message": "Data message processed", "timestamp": datetime.now().isoformat() } } elif message_type == "command": # 命令消息响应 response = { "type": "command_response", "data": { "status": "executed", "message": "Command executed successfully", "timestamp": datetime.now().isoformat() } } else: # 默认响应 response = { "type": "response", "data": { "status": "ok", "message": "Message received", "timestamp": datetime.now().isoformat() } } # 发送响应 await websocket.send(json.dumps(response)) logger.info(f"发送响应 [{client_id}]: {response}") except json.JSONDecodeError: # 非JSON消息 response = { "type": "error", "data": { "message": "Invalid JSON format", "timestamp": datetime.now().isoformat() } } await websocket.send(json.dumps(response)) logger.warning(f"无效JSON格式 [{client_id}]: {message}") except websockets.exceptions.ConnectionClosed: logger.info(f"客户端断开连接: {client_id}") except Exception as e: logger.error(f"处理客户端消息异常 [{client_id}]: {e}") finally: self.clients.remove(websocket) logger.info(f"客户端清理完成: {client_id}") async def start_server(self): """启动服务器""" server = await websockets.serve( self.handle_client, self.host, self.port ) logger.info(f"WebSocket测试服务器启动成功: ws://{self.host}:{self.port}") logger.info("按 Ctrl+C 停止服务器") try: await server.wait_closed() except KeyboardInterrupt: logger.info("收到停止信号,正在关闭服务器...") finally: server.close() await server.wait_closed() logger.info("WebSocket测试服务器已关闭") async def main(): """主函数""" print("WebSocket测试服务器") print("="*40) server = SimpleWebSocketServer() await server.start_server() if __name__ == "__main__": asyncio.run(main())