From 29dc939cf61498a857e4e276765168a25e3182ed Mon Sep 17 00:00:00 2001 From: BBC-Esq Date: Sat, 5 Apr 2025 11:35:40 -0400 Subject: [PATCH] Update directory_reader.py --- .../object_readers/directory_reader.py | 135 +++++++++++++----- 1 file changed, 98 insertions(+), 37 deletions(-) diff --git a/apis/python/src/tiledb/vector_search/object_readers/directory_reader.py b/apis/python/src/tiledb/vector_search/object_readers/directory_reader.py index 243530fdd..096188a9f 100644 --- a/apis/python/src/tiledb/vector_search/object_readers/directory_reader.py +++ b/apis/python/src/tiledb/vector_search/object_readers/directory_reader.py @@ -10,11 +10,8 @@ ) import numpy as np - import tiledb - -# from tiledb.vector_search.object_readers import ObjectPartition, ObjectReader - +import re # Util functions for file matching and recursive listing def match_uri( @@ -39,7 +36,6 @@ def match_uri( return True return True - def find_uris_vfs( search_uri: str, include: str = "*", @@ -103,7 +99,6 @@ def find( else: return results - def find_uris_tiledb_group( search_uri: str, include: str = "*", @@ -172,7 +167,6 @@ def find( else: return results - def find_uris_aws( search_uri: str, include: str = "*", @@ -187,8 +181,7 @@ def find_uris_aws( import boto3 except ImportError: raise ImportError( - "Could not import boto3 python package. " - "Please install it with `pip install boto3`." + "Could not import boto3 python package. Please install it with `pip install boto3`." ) search_uri = search_uri.rstrip("/") + "/" @@ -241,8 +234,7 @@ def find_uris_aws( return results return results - -# class DirectoryPartition(ObjectPartition): +# DirectoryPartition class class DirectoryPartition: def __init__( self, @@ -264,8 +256,7 @@ def init_kwargs(self) -> Dict: def id(self) -> int: return self.partition_id - -# class DirectoryReader(ObjectReader): +# DirectoryReader class class DirectoryReader: def __init__( self, @@ -275,14 +266,12 @@ def __init__( suffixes: Optional[Sequence[str]] = None, max_files: Optional[int] = None, ): - """Initialize with a path to directory and how to glob over it. - + """ + Initialize with a path to directory and how to glob over it. Args: search_uri: Path of directory to load files from. - include: File pattern to iclude relative to `search_uri`. By default set - to include all files. - exclude: File patterns to exclude relative to `search_uri`. By default set - to ignore all hidden files. + include: File pattern to iclude relative to `search_uri`. By default set to include all files. + exclude: File patterns to exclude relative to `search_uri`. By default set to ignore all hidden files. suffixes: Provide to keep only files with these suffixes Useful when wanting to keep files with different suffixes Suffixes must include the dot, e.g. ".txt" @@ -348,31 +337,101 @@ def get_partitions( partition_id += 1 return partitions +# Custom PyMuPDFParser to add unique page markers to PDF content +from langchain_community.docstore.document import Document +from langchain_community.document_loaders.parsers.pdf import PyMuPDFParser +import pymupdf + +class CustomPyMuPDFParser(PyMuPDFParser): + def _lazy_parse(self, blob, text_kwargs: Optional[Dict] = None) -> Iterator[Document]: + # Acquire the parent lock for thread safety + with PyMuPDFParser._lock: + with blob.as_bytes_io() as file_path: + # Open the document using pymupdf + doc = pymupdf.open(stream=file_path, filetype="pdf") if blob.data else pymupdf.open(file_path) + full_content = [] + for page in doc: + page_content = self._get_page_content(doc, page, text_kwargs) + # Only add page marker and content if there's actual content + if page_content.strip(): + full_content.append(f"[[page{page.number + 1}]]{page_content}") + yield Document( + page_content="".join(full_content), + metadata=self._extract_metadata(doc, blob) + ) + +# Custom PyMuPDFLoader using the custom parser +from langchain_community.document_loaders import PyMuPDFLoader +class CustomPyMuPDFLoader(PyMuPDFLoader): + def __init__(self, file_path, **kwargs): + super().__init__(file_path, **kwargs) + self.parser = CustomPyMuPDFParser( + text_kwargs=kwargs.get('text_kwargs'), + extract_images=kwargs.get('extract_images', False) + ) + +# Custom splitting function to assign page metadata from custom page markers +def add_pymupdf_page_metadata(doc: Document, chunk_size: int = 1200, chunk_overlap: int = 600) -> List[Document]: + """ + Chunks the body of text returned by the custom PyMuPDFParser. + Uses custom page markers (e.g. [[page1]]) to: + 1. Remove the markers from the text. + 2. Split the clean text into chunks. + 3. Determine the originating page for each chunk based on marker positions. + Returns a list of Documents with updated metadata. + """ + def split_text(text: str, chunk_size: int, chunk_overlap: int) -> List[Tuple[str, int]]: + page_markers = [(m.start(), int(m.group(1))) for m in re.finditer(r'\[\[page(\d+)\]\]', text)] + clean_text = re.sub(r'\[\[page\d+\]\]', '', text) + chunks = [] + start = 0 + while start < len(clean_text): + end = start + chunk_size + if end > len(clean_text): + end = len(clean_text) + chunk = clean_text[start:end].strip() + page_num = None + for marker_pos, page in reversed(page_markers): + if marker_pos <= start: + page_num = page + break + if chunk and page_num is not None: + chunks.append((chunk, page_num)) + start += chunk_size - chunk_overlap + return chunks + + chunks = split_text(doc.page_content, chunk_size, chunk_overlap) + new_docs = [] + for chunk, page_num in chunks: + new_metadata = doc.metadata.copy() + new_metadata['page'] = page_num + new_doc = Document( + page_content=chunk, + metadata=new_metadata + ) + new_docs.append(new_doc) + return new_docs +# TileDBLoader class using the custom parser for PDFs class TileDBLoader: def __init__( self, uri: str, ) -> None: - from langchain_community.document_loaders.parsers.generic import ( - MimeTypeBasedParser, - ) + from langchain_community.document_loaders.parsers.generic import MimeTypeBasedParser from langchain_community.document_loaders.parsers.html import BS4HTMLParser from langchain_community.document_loaders.parsers.msword import MsWordParser - from langchain_community.document_loaders.parsers.pdf import PyMuPDFParser from langchain_community.document_loaders.parsers.txt import TextParser self.uri = uri self.parser = MimeTypeBasedParser( handlers={ - "application/pdf": PyMuPDFParser(), + "application/pdf": CustomPyMuPDFParser(), "text/plain": TextParser(), "text/markdown": TextParser(), "text/html": BS4HTMLParser(), "application/msword": MsWordParser(), - "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ( - MsWordParser() - ), + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": MsWordParser(), }, fallback_parser=None, ) @@ -386,7 +445,6 @@ def lazy_load( ) -> Iterator: """A lazy loader for Documents.""" import mimetypes - from langchain_community.document_loaders.blob_loaders import Blob vfs = tiledb.VFS() @@ -415,7 +473,7 @@ def load_and_split(self, text_splitter) -> List: """Load all documents and split them into sentences.""" return text_splitter.split_documents(self.load()) - +# DirectoryTextReader class class DirectoryTextReader(DirectoryReader): MAX_OBJECTS_PER_FILE = 10000 @@ -477,7 +535,16 @@ def read_objects( for uri in partition.paths: try: loader = TileDBLoader(uri=uri) - documents = loader.load_and_split(text_splitter=text_splitter) + # For PDFs, use the custom page-marker based splitting + if uri.lower().endswith(".pdf"): + document = next(loader.lazy_load()) + documents = add_pymupdf_page_metadata( + document, + chunk_size=self.text_splitter_kwargs["chunk_size"], + chunk_overlap=self.text_splitter_kwargs["chunk_overlap"], + ) + else: + documents = loader.load_and_split(text_splitter=text_splitter) text_chunk_id = 0 for d in documents: file_paths[write_id] = str(uri) @@ -503,7 +570,7 @@ def read_objects_by_external_ids(self, ids: List[int]) -> OrderedDict: # TODO implement return None - +# DirectoryImageReader class class DirectoryImageReader(DirectoryReader): def __init__( self, @@ -546,9 +613,7 @@ def read_objects( self, partition: DirectoryPartition ) -> Tuple[OrderedDict, OrderedDict]: import mimetypes - from PIL import Image - import tiledb max_size = DirectoryTextReader.MAX_OBJECTS_PER_FILE * len(partition.paths) @@ -569,16 +634,13 @@ def read_objects( fp = vfs.open(path) if path.endswith(".pdf") or mime_type == "application/pdf": from io import BytesIO - import fitz - doc = fitz.open(stream=fp.read()) p = 0 for page in doc.pages(): zoom = 1 mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) - image = np.array( Image.open(BytesIO(pix.tobytes(output="png", jpg_quality=95))) )[:, :, :3] @@ -614,7 +676,6 @@ def read_objects( def read_objects_by_external_ids(self, ids: List[int]) -> OrderedDict: from PIL import Image - import tiledb if self.paths is None: