You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
201 lines
6.2 KiB
201 lines
6.2 KiB
"""
|
|
结构化日志系统 - 类似Serilog的功能
|
|
"""
|
|
import json
|
|
import logging
|
|
import sys
|
|
import traceback
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Optional, Union
|
|
from contextvars import ContextVar
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
# 请求上下文变量
|
|
request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None)
|
|
user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None)
|
|
session_id_var: ContextVar[Optional[str]] = ContextVar('session_id', default=None)
|
|
|
|
|
|
class LogLevel(Enum):
|
|
"""日志级别枚举"""
|
|
TRACE = "TRACE"
|
|
DEBUG = "DEBUG"
|
|
INFO = "INFO"
|
|
WARNING = "WARNING"
|
|
ERROR = "ERROR"
|
|
CRITICAL = "CRITICAL"
|
|
|
|
|
|
@dataclass
|
|
class LogContext:
|
|
"""日志上下文"""
|
|
request_id: Optional[str] = None
|
|
user_id: Optional[str] = None
|
|
session_id: Optional[str] = None
|
|
module: Optional[str] = None
|
|
function: Optional[str] = None
|
|
line_number: Optional[int] = None
|
|
timestamp: Optional[str] = None
|
|
duration_ms: Optional[float] = None
|
|
extra: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
@dataclass
|
|
class LogEntry:
|
|
"""日志条目"""
|
|
timestamp: str
|
|
level: str
|
|
message: str
|
|
logger_name: str
|
|
context: LogContext
|
|
exception: Optional[str] = None
|
|
stack_trace: Optional[str] = None
|
|
|
|
|
|
class StructuredFormatter(logging.Formatter):
|
|
"""结构化日志格式化器"""
|
|
|
|
def __init__(self, include_context: bool = True, include_stack_trace: bool = True):
|
|
super().__init__()
|
|
self.include_context = include_context
|
|
self.include_stack_trace = include_stack_trace
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
"""格式化日志记录"""
|
|
# 获取上下文信息
|
|
context = LogContext(
|
|
request_id=request_id_var.get(),
|
|
user_id=user_id_var.get(),
|
|
session_id=session_id_var.get(),
|
|
module=record.module,
|
|
function=record.funcName,
|
|
line_number=record.lineno,
|
|
timestamp=datetime.fromtimestamp(record.created).isoformat(),
|
|
extra=getattr(record, 'extra', {})
|
|
)
|
|
|
|
# 创建日志条目
|
|
log_entry = LogEntry(
|
|
timestamp=context.timestamp,
|
|
level=record.levelname,
|
|
message=record.getMessage(),
|
|
logger_name=record.name,
|
|
context=context,
|
|
exception=str(record.exc_info[1]) if record.exc_info else None,
|
|
stack_trace=traceback.format_exception(*record.exc_info) if record.exc_info and self.include_stack_trace else None
|
|
)
|
|
|
|
# 转换为JSON格式
|
|
return json.dumps(asdict(log_entry), ensure_ascii=False, default=str)
|
|
|
|
|
|
class StructuredLogger:
|
|
"""结构化日志记录器"""
|
|
|
|
def __init__(self, name: str, level: LogLevel = LogLevel.INFO):
|
|
self.logger = logging.getLogger(name)
|
|
self.logger.setLevel(getattr(logging, level.value))
|
|
self.name = name
|
|
|
|
# 避免重复添加处理器
|
|
if not self.logger.handlers:
|
|
self._setup_handlers()
|
|
|
|
def _setup_handlers(self):
|
|
"""设置日志处理器"""
|
|
# 控制台处理器
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setFormatter(StructuredFormatter())
|
|
self.logger.addHandler(console_handler)
|
|
|
|
# 文件处理器
|
|
file_handler = logging.FileHandler(f"logs/{self.name}.log", encoding='utf-8')
|
|
file_handler.setFormatter(StructuredFormatter(include_stack_trace=True))
|
|
self.logger.addHandler(file_handler)
|
|
|
|
def _log(self, level: LogLevel, message: str, **kwargs):
|
|
"""通用日志方法"""
|
|
extra = {
|
|
'extra': kwargs,
|
|
'level': level.value
|
|
}
|
|
self.logger.log(getattr(logging, level.value), message, extra=extra)
|
|
|
|
def trace(self, message: str, **kwargs):
|
|
"""TRACE级别日志"""
|
|
self._log(LogLevel.TRACE, message, **kwargs)
|
|
|
|
def debug(self, message: str, **kwargs):
|
|
"""DEBUG级别日志"""
|
|
self._log(LogLevel.DEBUG, message, **kwargs)
|
|
|
|
def info(self, message: str, **kwargs):
|
|
"""INFO级别日志"""
|
|
self._log(LogLevel.INFO, message, **kwargs)
|
|
|
|
def warning(self, message: str, **kwargs):
|
|
"""WARNING级别日志"""
|
|
self._log(LogLevel.WARNING, message, **kwargs)
|
|
|
|
def error(self, message: str, **kwargs):
|
|
"""ERROR级别日志"""
|
|
self._log(LogLevel.ERROR, message, **kwargs)
|
|
|
|
def critical(self, message: str, **kwargs):
|
|
"""CRITICAL级别日志"""
|
|
self._log(LogLevel.CRITICAL, message, **kwargs)
|
|
|
|
def exception(self, message: str, **kwargs):
|
|
"""异常日志"""
|
|
kwargs['exception'] = True
|
|
self._log(LogLevel.ERROR, message, **kwargs)
|
|
|
|
|
|
class LogManager:
|
|
"""日志管理器"""
|
|
|
|
_loggers: Dict[str, StructuredLogger] = {}
|
|
|
|
@classmethod
|
|
def get_logger(cls, name: str, level: LogLevel = LogLevel.INFO) -> StructuredLogger:
|
|
"""获取日志记录器"""
|
|
if name not in cls._loggers:
|
|
cls._loggers[name] = StructuredLogger(name, level)
|
|
return cls._loggers[name]
|
|
|
|
@classmethod
|
|
def set_request_context(cls, request_id: str, user_id: Optional[str] = None, session_id: Optional[str] = None):
|
|
"""设置请求上下文"""
|
|
request_id_var.set(request_id)
|
|
if user_id:
|
|
user_id_var.set(user_id)
|
|
if session_id:
|
|
session_id_var.set(session_id)
|
|
|
|
@classmethod
|
|
def clear_context(cls):
|
|
"""清除上下文"""
|
|
request_id_var.set(None)
|
|
user_id_var.set(None)
|
|
session_id_var.set(None)
|
|
|
|
|
|
# 便捷函数
|
|
def get_structured_logger(name: str, level: LogLevel = LogLevel.INFO) -> StructuredLogger:
|
|
"""获取结构化日志记录器"""
|
|
return LogManager.get_logger(name, level)
|
|
|
|
|
|
def set_request_context(request_id: str, user_id: Optional[str] = None, session_id: Optional[str] = None):
|
|
"""设置请求上下文"""
|
|
LogManager.set_request_context(request_id, user_id, session_id)
|
|
|
|
|
|
def clear_log_context():
|
|
"""清除日志上下文"""
|
|
LogManager.clear_context()
|
|
|
|
|
|
# 创建默认日志记录器
|
|
default_logger = get_structured_logger("TermControlAgent")
|