You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
6.2 KiB

"""
结构化日志系统 - 类似Serilog的功能
"""
import json
import logging
import sys
import traceback
from datetime import datetime
from typing import Any, Dict, Optional, Union
from contextvars import ContextVar
from dataclasses import dataclass, asdict
from enum import Enum
# 请求上下文变量
request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None)
user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None)
session_id_var: ContextVar[Optional[str]] = ContextVar('session_id', default=None)
class LogLevel(Enum):
"""日志级别枚举"""
TRACE = "TRACE"
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
@dataclass
class LogContext:
"""日志上下文"""
request_id: Optional[str] = None
user_id: Optional[str] = None
session_id: Optional[str] = None
module: Optional[str] = None
function: Optional[str] = None
line_number: Optional[int] = None
timestamp: Optional[str] = None
duration_ms: Optional[float] = None
extra: Optional[Dict[str, Any]] = None
@dataclass
class LogEntry:
"""日志条目"""
timestamp: str
level: str
message: str
logger_name: str
context: LogContext
exception: Optional[str] = None
stack_trace: Optional[str] = None
class StructuredFormatter(logging.Formatter):
"""结构化日志格式化器"""
def __init__(self, include_context: bool = True, include_stack_trace: bool = True):
super().__init__()
self.include_context = include_context
self.include_stack_trace = include_stack_trace
def format(self, record: logging.LogRecord) -> str:
"""格式化日志记录"""
# 获取上下文信息
context = LogContext(
request_id=request_id_var.get(),
user_id=user_id_var.get(),
session_id=session_id_var.get(),
module=record.module,
function=record.funcName,
line_number=record.lineno,
timestamp=datetime.fromtimestamp(record.created).isoformat(),
extra=getattr(record, 'extra', {})
)
# 创建日志条目
log_entry = LogEntry(
timestamp=context.timestamp,
level=record.levelname,
message=record.getMessage(),
logger_name=record.name,
context=context,
exception=str(record.exc_info[1]) if record.exc_info else None,
stack_trace=traceback.format_exception(*record.exc_info) if record.exc_info and self.include_stack_trace else None
)
# 转换为JSON格式
return json.dumps(asdict(log_entry), ensure_ascii=False, default=str)
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, name: str, level: LogLevel = LogLevel.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level.value))
self.name = name
# 避免重复添加处理器
if not self.logger.handlers:
self._setup_handlers()
def _setup_handlers(self):
"""设置日志处理器"""
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(StructuredFormatter())
self.logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.FileHandler(f"logs/{self.name}.log", encoding='utf-8')
file_handler.setFormatter(StructuredFormatter(include_stack_trace=True))
self.logger.addHandler(file_handler)
def _log(self, level: LogLevel, message: str, **kwargs):
"""通用日志方法"""
extra = {
'extra': kwargs,
'level': level.value
}
self.logger.log(getattr(logging, level.value), message, extra=extra)
def trace(self, message: str, **kwargs):
"""TRACE级别日志"""
self._log(LogLevel.TRACE, message, **kwargs)
def debug(self, message: str, **kwargs):
"""DEBUG级别日志"""
self._log(LogLevel.DEBUG, message, **kwargs)
def info(self, message: str, **kwargs):
"""INFO级别日志"""
self._log(LogLevel.INFO, message, **kwargs)
def warning(self, message: str, **kwargs):
"""WARNING级别日志"""
self._log(LogLevel.WARNING, message, **kwargs)
def error(self, message: str, **kwargs):
"""ERROR级别日志"""
self._log(LogLevel.ERROR, message, **kwargs)
def critical(self, message: str, **kwargs):
"""CRITICAL级别日志"""
self._log(LogLevel.CRITICAL, message, **kwargs)
def exception(self, message: str, **kwargs):
"""异常日志"""
kwargs['exception'] = True
self._log(LogLevel.ERROR, message, **kwargs)
class LogManager:
"""日志管理器"""
_loggers: Dict[str, StructuredLogger] = {}
@classmethod
def get_logger(cls, name: str, level: LogLevel = LogLevel.INFO) -> StructuredLogger:
"""获取日志记录器"""
if name not in cls._loggers:
cls._loggers[name] = StructuredLogger(name, level)
return cls._loggers[name]
@classmethod
def set_request_context(cls, request_id: str, user_id: Optional[str] = None, session_id: Optional[str] = None):
"""设置请求上下文"""
request_id_var.set(request_id)
if user_id:
user_id_var.set(user_id)
if session_id:
session_id_var.set(session_id)
@classmethod
def clear_context(cls):
"""清除上下文"""
request_id_var.set(None)
user_id_var.set(None)
session_id_var.set(None)
# 便捷函数
def get_structured_logger(name: str, level: LogLevel = LogLevel.INFO) -> StructuredLogger:
"""获取结构化日志记录器"""
return LogManager.get_logger(name, level)
def set_request_context(request_id: str, user_id: Optional[str] = None, session_id: Optional[str] = None):
"""设置请求上下文"""
LogManager.set_request_context(request_id, user_id, session_id)
def clear_log_context():
"""清除日志上下文"""
LogManager.clear_context()
# 创建默认日志记录器
default_logger = get_structured_logger("TermControlAgent")