Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1eb5352
fix: address unresolved review comments from PyPDF File Processor PR …
RobuRishabh Mar 17, 2026
9f453c9
Merge branch 'main' into RHAIENG-1823-Address-Unresolved-Reviews
mergify[bot] Mar 17, 2026
8838dcf
Merge branch 'main' into RHAIENG-1823-Address-Unresolved-Reviews
mergify[bot] Mar 18, 2026
74a8de0
Merge remote-tracking branch 'upstream/main' into RHAIENG-1823-Addres…
RobuRishabh Mar 18, 2026
ba12fe7
fix: add file_processors to vector IO integration test --stack-config
RobuRishabh Mar 18, 2026
c087c33
Merge remote-tracking branch 'upstream/main' into RHAIENG-1823-Addres…
RobuRishabh Mar 18, 2026
d216c3b
Merge branch 'RHAIENG-1823-Address-Unresolved-Reviews' of https://git…
RobuRishabh Mar 18, 2026
dc949b3
Merge remote-tracking branch 'upstream/main' into RHAIENG-1823-Addres…
RobuRishabh Mar 19, 2026
e375cab
fix: critical improvements to file processing pipeline
RobuRishabh Mar 19, 2026
349246f
Merge branch 'main' into RHAIENG-1823-Address-Unresolved-Reviews
cdoern Mar 20, 2026
38cc121
Merge branch 'main' into RHAIENG-1823-Address-Unresolved-Reviews
mergify[bot] Mar 20, 2026
c7ad32a
fix: resolve broken merge in openai_vector_store_mixin.py
RobuRishabh Apr 1, 2026
3bfde3d
Merge branch 'RHAIENG-1823-Address-Unresolved-Reviews' of https://git…
RobuRishabh Apr 1, 2026
af7ddbc
fix: address pre-commit failures and update unit tests for upstream A…
RobuRishabh Apr 1, 2026
8e62fd1
Merge remote-tracking branch 'upstream/main' into RHAIENG-1823-Addres…
RobuRishabh Apr 2, 2026
90a8808
extracted magic number to a named constant
RobuRishabh Apr 3, 2026
1190ab5
Add responses integration test recordings for file processor changes
RobuRishabh Apr 3, 2026
a9eb280
fix(tests): skip recording/replay for local file processor in api_rec…
RobuRishabh Apr 3, 2026
9da173e
ran precommit check
RobuRishabh Apr 3, 2026
f399301
fix(file_processor): handle contextual chunking strategy in pypdf pro…
RobuRishabh Apr 3, 2026
d3f8569
regenerated broken recording for gpt setup
RobuRishabh Apr 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-vector-io-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ jobs:
INFINISPAN_PASSWORD: ${{ matrix.vector-io-provider == 'remote::infinispan' && 'password' || '' }}
run: |
uv run --no-sync \
pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers?trust_remote_code=true,vector_io=${{ matrix.vector-io-provider }}" \
pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers?trust_remote_code=true,vector_io=${{ matrix.vector-io-provider }},file_processors=inline::pypdf" \
tests/integration/vector_io

- name: Check Storage and Memory Available After Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def get_provider_impl(config: PyPDFFileProcessorConfig, deps: dict[Api, An

assert isinstance(config, PyPDFFileProcessorConfig), f"Unexpected config type: {type(config)}"

files_api = deps.get(Api.files)
files_api = deps[Api.files]

impl = PyPDFFileProcessorAdapter(config, files_api)
return impl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class PyPDFFileProcessorAdapter:
"""Adapter for PyPDF file processor."""

def __init__(self, config: PyPDFFileProcessorConfig, files_api=None) -> None:
def __init__(self, config: PyPDFFileProcessorConfig, files_api) -> None:
self.config = config
self.files_api = files_api
self.processor = PyPDFFileProcessor(config, files_api)
Expand Down
127 changes: 95 additions & 32 deletions src/llama_stack/providers/inline/file_processor/pypdf/pypdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
# the root directory of this source tree.

import io
import mimetypes
import time
import uuid
from typing import Any

import chardet
from fastapi import HTTPException, UploadFile
from pypdf import PdfReader

Expand All @@ -29,11 +31,13 @@
# Large enough to fit any reasonable document in one chunk
SINGLE_CHUNK_WINDOW_TOKENS = 1_000_000

UPLOAD_READ_CHUNK_BYTES = 64 * 1024


class PyPDFFileProcessor:
"""PyPDF-based file processor for PDF documents."""

def __init__(self, config: PyPDFFileProcessorConfig, files_api=None) -> None:
def __init__(self, config: PyPDFFileProcessorConfig, files_api) -> None:
self.config = config
self.files_api = files_api

Expand All @@ -44,7 +48,7 @@ async def process_file(
options: dict[str, Any] | None = None,
chunking_strategy: VectorStoreChunkingStrategy | None = None,
) -> ProcessFileResponse:
"""Process a PDF file and return chunks."""
"""Process a file and return chunks. Supports PDF and text-based files."""

# Validate input
if not file and not file_id:
Expand All @@ -54,62 +58,82 @@ async def process_file(

start_time = time.time()

# Get PDF content
# Get file content
if file:
# Read from uploaded file
content = await file.read()
if len(content) > self.config.max_file_size_bytes:
raise ValueError(
f"File size {len(content)} bytes exceeds maximum of {self.config.max_file_size_bytes} bytes"
)
# Read file in chunks to enable early termination on oversized files
# This prevents reading gigabytes before discovering the file exceeds limits
# Note: Full content is still assembled in memory for PDF/text processing
content_parts = []
bytes_read = 0
while True:
chunk = await file.read(UPLOAD_READ_CHUNK_BYTES)
if not chunk:
break
bytes_read += len(chunk)
if bytes_read > self.config.max_file_size_bytes:
raise ValueError(f"File size exceeds maximum of {self.config.max_file_size_bytes} bytes")
content_parts.append(chunk)
content = b"".join(content_parts)
filename = file.filename or f"{uuid.uuid4()}.pdf"
elif file_id:
# Get file from file storage using Files API
if not self.files_api:
raise ValueError("Files API not available - cannot process file_id")

# Get file metadata
file_info = await self.files_api.openai_retrieve_file(RetrieveFileRequest(file_id=file_id))
try:
file_info = await self.files_api.openai_retrieve_file(RetrieveFileRequest(file_id=file_id))
except Exception as e:
raise ValueError(f"File with id '{file_id}' not found") from e
filename = file_info.filename

# Get file content
content_response = await self.files_api.openai_retrieve_file_content(
RetrieveFileContentRequest(file_id=file_id)
)
content = content_response.body
if len(content) > self.config.max_file_size_bytes:
raise ValueError(
f"File size {len(content)} bytes exceeds maximum of {self.config.max_file_size_bytes} bytes"
)

mime_type, _ = mimetypes.guess_type(filename)
mime_category = mime_type.split("/")[0] if (mime_type and "/" in mime_type) else None

if mime_type == "application/pdf":
return self._process_pdf(content, filename, file_id, chunking_strategy, start_time)
elif mime_category == "text":
return self._process_text(content, filename, file_id, chunking_strategy, start_time)
else:
# Attempt text decoding as a fallback for unknown types
log.warning(f"Unknown mime type '{mime_type}' for file '{filename}', attempting text extraction")
return self._process_text(content, filename, file_id, chunking_strategy, start_time)

def _process_pdf(
self,
content: bytes,
filename: str,
file_id: str | None,
chunking_strategy: VectorStoreChunkingStrategy | None,
start_time: float,
) -> ProcessFileResponse:
"""Process a PDF file."""
pdf_bytes = io.BytesIO(content)
reader = PdfReader(pdf_bytes)

if reader.is_encrypted:
raise HTTPException(status_code=422, detail="Password-protected PDFs are not supported")

# Extract text from PDF
text_content, failed_pages = self._extract_pdf_text(reader)

# Clean text if configured
if self.config.clean_text:
text_content = self._clean_text(text_content)

# Extract metadata if configured
pdf_metadata = {}
if self.config.extract_metadata:
pdf_metadata = self._extract_pdf_metadata(reader)

document_id = str(uuid.uuid4())

# Prepare document metadata (include filename and file_id)
document_metadata = {
"filename": filename,
**pdf_metadata,
}
document_metadata: dict[str, Any] = {"filename": filename, **pdf_metadata}
if file_id:
document_metadata["file_id"] = file_id

processing_time_ms = int((time.time() - start_time) * 1000)

# Create response metadata
response_metadata = {
response_metadata: dict[str, Any] = {
"processor": "pypdf",
"processing_time_ms": processing_time_ms,
"page_count": pdf_metadata.get("page_count", 0),
Expand All @@ -120,13 +144,48 @@ async def process_file(
if failed_pages:
response_metadata["failed_pages"] = failed_pages

# Handle empty text - return empty chunks with metadata
if not text_content or not text_content.strip():
return ProcessFileResponse(chunks=[], metadata=response_metadata)

# Create chunks for non-empty text
chunks = self._create_chunks(text_content, document_id, chunking_strategy, document_metadata)
return ProcessFileResponse(chunks=chunks, metadata=response_metadata)

def _process_text(
self,
content: bytes,
filename: str,
file_id: str | None,
chunking_strategy: VectorStoreChunkingStrategy | None,
start_time: float,
) -> ProcessFileResponse:
"""Process a text-based file (txt, csv, md, etc.)."""
detected = chardet.detect(content)
encoding = detected["encoding"] or "utf-8"
try:
text_content = content.decode(encoding)
except UnicodeDecodeError:
text_content = content.decode("utf-8", errors="replace")

if self.config.clean_text:
text_content = self._clean_text(text_content)

document_id = str(uuid.uuid4())
document_metadata: dict[str, Any] = {"filename": filename}
if file_id:
document_metadata["file_id"] = file_id

processing_time_ms = int((time.time() - start_time) * 1000)
response_metadata: dict[str, Any] = {
"processor": "text",
"processing_time_ms": processing_time_ms,
"extraction_method": "text",
"file_size_bytes": len(content),
}

if not text_content or not text_content.strip():
return ProcessFileResponse(chunks=[], metadata=response_metadata)

chunks = self._create_chunks(text_content, document_id, chunking_strategy, document_metadata)
return ProcessFileResponse(chunks=chunks, metadata=response_metadata)

def _extract_pdf_text(self, reader: PdfReader) -> tuple[str, list[str]]:
Expand All @@ -140,8 +199,9 @@ def _extract_pdf_text(self, reader: PdfReader) -> tuple[str, list[str]]:
except Exception as e:
failed_pages.append(f"page {page_num + 1}: {e}")
continue
if page_text and page_text.strip():
text_parts.append(page_text)
if page_text:
if not self.config.clean_text or page_text.strip():
text_parts.append(page_text)

return "\n".join(text_parts), failed_pages

Expand Down Expand Up @@ -209,6 +269,9 @@ def _create_chunks(
elif chunking_strategy.type == "static":
chunk_size = chunking_strategy.static.max_chunk_size_tokens
overlap_size = chunking_strategy.static.chunk_overlap_tokens
elif chunking_strategy.type == "contextual":
chunk_size = chunking_strategy.contextual.max_chunk_size_tokens
overlap_size = chunking_strategy.contextual.chunk_overlap_tokens
else:
chunk_size = self.config.default_chunk_size_tokens
overlap_size = self.config.default_chunk_overlap_tokens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async def query_hybrid(


class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
"""Vector I/O adapter using FAISS for in-memory vector similarity search."""
"""VectorIO adapter that uses FAISS for similarity search and vector storage."""

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ async def query_hybrid(


class WeaviateVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorStoresProtocolPrivate):
"""Vector I/O adapter for remote Weaviate instances."""
"""VectorIO adapter that uses Weaviate for similarity search and vector storage."""

def __init__(
self,
Expand Down
Loading