from typing import List, Union, Set from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader, JSONLoader from langchain.schema import Document import os import json import hashlib from app.core.logger import logger class DocumentService: def __init__(self): self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len, ) # 用于存储已处理文件的哈希值 self.processed_files: Set[str] = set() def _calculate_file_hash(self, file_path: str) -> str: """计算文件的哈希值""" hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def _is_file_processed(self, file_path: str) -> bool: """检查文件是否已经处理过""" file_hash = self._calculate_file_hash(file_path) if file_hash in self.processed_files: logger.info(f"文件已处理过,跳过: {file_path}") return True self.processed_files.add(file_hash) return False def load_documents(self, path: str) -> List[Document]: """加载文档,支持单个文件或目录""" if os.path.isfile(path): if self._is_file_processed(path): return [] return self._load_single_file(path) else: return self._load_directory(path) def _load_single_file(self, file_path: str) -> List[Document]: """加载单个文件""" if self._is_file_processed(file_path): return [] file_extension = os.path.splitext(file_path)[1].lower() try: if file_extension == '.pdf': loader = PyPDFLoader(file_path) elif file_extension == '.txt': loader = TextLoader(file_path, encoding='utf-8') elif file_extension == '.json': # 读取 JSON 文件内容 with open(file_path, 'r', encoding='utf-8') as f: json_content = json.load(f) # 将 JSON 转换为文本格式 text_content = self._json_to_text(json_content) # 创建文档 return [Document(page_content=text_content, metadata={"source": file_path})] else: logger.warning(f"不支持的文件类型: {file_extension}") return [] return loader.load() except Exception as e: logger.error(f"处理文件时出错 {file_path}: {str(e)}") return [] def _json_to_text(self, json_content: Union[dict, list], indent: int = 0) -> str: """将 JSON 内容转换为易读的文本格式""" if isinstance(json_content, dict): text = [] for key, value in json_content.items(): if isinstance(value, (dict, list)): text.append(f"{' ' * indent}{key}:") text.append(self._json_to_text(value, indent + 1)) else: text.append(f"{' ' * indent}{key}: {value}") return "\n".join(text) elif isinstance(json_content, list): text = [] for i, item in enumerate(json_content): text.append(f"{' ' * indent}Item {i + 1}:") text.append(self._json_to_text(item, indent + 1)) return "\n".join(text) else: return str(json_content) def _load_directory(self, directory_path: str) -> List[Document]: """加载指定目录下的所有文档""" documents = [] # 加载 PDF 文件 pdf_loader = DirectoryLoader( directory_path, glob="**/*.pdf", loader_cls=PyPDFLoader ) documents.extend(pdf_loader.load()) # 加载文本文件 txt_loader = DirectoryLoader( directory_path, glob="**/*.txt", loader_cls=TextLoader, loader_kwargs={'encoding': 'utf-8'} ) documents.extend(txt_loader.load()) # 加载 JSON 文件 for root, _, files in os.walk(directory_path): for file in files: if file.lower().endswith('.json'): file_path = os.path.join(root, file) if not self._is_file_processed(file_path): documents.extend(self._load_single_file(file_path)) return documents def clear_processed_files(self): """清空已处理文件记录""" self.processed_files.clear() logger.info("已清空文件处理记录") def split_documents(self, documents: List[Document]) -> List[Document]: """将文档分割成小块""" return self.text_splitter.split_documents(documents) def process_documents(self, path: str) -> List[Document]: """处理文档:加载并分割,支持单个文件或目录""" documents = self.load_documents(path) return self.split_documents(documents)