diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..c6cdf5380 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,9 @@ +env/ +logs/ +.env +.git +__pycache__ +*.pyc +*.pyo +cookbook/ +examples/ \ No newline at end of file diff --git a/.gitignore b/.gitignore index 23d6b5655..7a8a94ea3 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,14 @@ __pycache__ .env* .venv/ logs/ +.idea/ +.claude/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..2aa1d63dc --- /dev/null +++ b/Dockerfile @@ -0,0 +1,37 @@ +# syntax=docker/dockerfile:1 + +FROM python:3.11-slim as builder + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +ENV VIRTUAL_ENV=/opt/venv +RUN python -m venv $VIRTUAL_ENV +ENV PATH="$VIRTUAL_ENV/bin:$PATH" + +COPY requirements.txt . +RUN --mount=type=cache,target=/root/.cache/pip \ + pip install --upgrade pip && \ + pip install -r requirements.txt + +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app + +COPY --from=builder /opt/venv /opt/venv +ENV PATH="/opt/venv/bin:$PATH" + +COPY . . + +EXPOSE 8080 + +CMD ["uvicorn", "api.server:app", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/dependencies.py b/api/dependencies.py new file mode 100644 index 000000000..0357f5f88 --- /dev/null +++ b/api/dependencies.py @@ -0,0 +1,17 @@ +import os +from functools import lru_cache + +import aioboto3 + +# TODO: replace with a dedicated async S3 client that manages its own connection pool +# and lifecycle (e.g. FastAPI lifespan event) instead of creating a new client per call. +@lru_cache(maxsize=1) +def get_s3_session() -> aioboto3.Session: + return aioboto3.Session() + + +def get_s3_bucket() -> str: + bucket = os.environ.get("PAGEINDEX_S3_BUCKET") + if not bucket: + raise RuntimeError("PAGEINDEX_S3_BUCKET environment variable is not set") + return bucket diff --git a/api/routers/__init__.py b/api/routers/__init__.py new file mode 100644 index 000000000..e366f67e9 --- /dev/null +++ b/api/routers/__init__.py @@ -0,0 +1,6 @@ +from fastapi import APIRouter + +from api.routers.pageindex import pageindex_router + +api_router = APIRouter() +api_router.include_router(pageindex_router, prefix="/pageindex", tags=["PageIndex"]) diff --git a/api/routers/pageindex.py b/api/routers/pageindex.py new file mode 100644 index 000000000..0cf0671be --- /dev/null +++ b/api/routers/pageindex.py @@ -0,0 +1,74 @@ +import logging + +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field + +from api.dependencies import get_s3_bucket, get_s3_session +from api.services.pageindex_service import ( + EmptyDocumentError, + S3KeyNotFoundError, + S3ReadError, + S3WriteError, + process_markdown, +) + +logger = logging.getLogger(__name__) + +pageindex_router = APIRouter() + + +_CONFIG_YAML_NOTE = "Defaults to the value in pageindex/config.yaml when not provided." + + +class MarkdownPageIndexRequest(BaseModel): + input_s3_key: str = Field(..., min_length=1, description="S3 key of the markdown file to index.") + output_s3_key: str = Field(..., min_length=1, description="S3 key where the output tree JSON will be written.") + tokens_per_page: int = Field( + default=2000, + ge=500, + le=10000, + description="Target token budget per virtual page. Controls section granularity.", + ) + + # Pipeline options — all optional; unset fields fall back to pageindex/config.yaml defaults + model: str | None = Field(default=None, description=f"LLM model name for all pipeline stages. {_CONFIG_YAML_NOTE}") + if_add_node_id: str | None = Field(default=None, description=f'"yes" or "no". {_CONFIG_YAML_NOTE}') + if_add_node_summary: str | None = Field(default=None, description=f'"yes" or "no". {_CONFIG_YAML_NOTE}') + if_add_node_text: str | None = Field(default=None, description=f'"yes" or "no". {_CONFIG_YAML_NOTE}') + if_add_doc_description: str | None = Field(default=None, description=f'"yes" or "no". {_CONFIG_YAML_NOTE}') + + extra_config: dict | None = Field( + default=None, + description="Escape hatch for any other config.yaml key not exposed above.", + ) + # TODO: add `content: str | None` to accept raw markdown inline, skipping the S3 read + + +class MarkdownPageIndexResponse(BaseModel): + output_s3_key: str + doc_description: str + structure: list + + +@pageindex_router.post("/markdown", response_model=MarkdownPageIndexResponse) +async def index_markdown(payload: MarkdownPageIndexRequest) -> MarkdownPageIndexResponse: + """Index a markdown document stored on S3 using the full PDF pipeline (process_no_toc path). + + Reads the markdown from S3, splits it into virtual pages, runs tree generation + + verification + retry logic, then writes the resulting tree JSON back to S3. + """ + try: + result = await process_markdown(payload, get_s3_session(), get_s3_bucket()) + except S3KeyNotFoundError as e: + logger.warning(str(e)) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) + except EmptyDocumentError as e: + logger.warning(str(e)) + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e)) + except (S3ReadError, S3WriteError) as e: + logger.error(str(e), exc_info=True) + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e)) + except Exception as e: + logger.error(f"PageIndex pipeline failed for '{payload.input_s3_key}': {e}", exc_info=True) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + return result diff --git a/api/server.py b/api/server.py new file mode 100644 index 000000000..3695b7f8a --- /dev/null +++ b/api/server.py @@ -0,0 +1,15 @@ +from fastapi import FastAPI + +from api.routers import api_router + +def create_app() -> FastAPI: + app = FastAPI( + title="PageIndex API", + version="1.0.0", + description="HTTP API for indexing markdown documents using the PageIndex PDF pipeline.", + ) + app.include_router(api_router, prefix="/api/v1") + return app + + +app = create_app() diff --git a/api/services/__init__.py b/api/services/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/services/pageindex_service.py b/api/services/pageindex_service.py new file mode 100644 index 000000000..dd2c3f876 --- /dev/null +++ b/api/services/pageindex_service.py @@ -0,0 +1,218 @@ +import asyncio +import json +import logging +import os +import re + +_PAGE_TAG_RE = re.compile(r"(.*?)", re.DOTALL) + +from pageindex.page_index import ( + add_preface_if_needed, + check_title_appearance_in_start_concurrent, + meta_processor, + process_large_node_recursively, +) +from pageindex.utils import ( + ConfigLoader, + add_node_text, + agenerate_doc_description, + count_tokens, + create_clean_structure_for_description, + format_structure, + generate_summaries_for_structure, + post_processing, + remove_structure_text, + write_node_id, +) + +logger = logging.getLogger(__name__) + + +# ── Typed exceptions ────────────────────────────────────────────────────────── + +class S3KeyNotFoundError(Exception): + """Raised when the requested S3 key does not exist.""" + +class S3ReadError(Exception): + """Raised when S3 read fails for any reason other than key-not-found.""" + +class S3WriteError(Exception): + """Raised when writing the output JSON to S3 fails.""" + +class EmptyDocumentError(Exception): + """Raised when the S3 object is empty or contains no processable content.""" + + +# ── S3 helpers ──────────────────────────────────────────────────────────────── + +async def read_markdown_from_s3(key: str, s3_session, bucket: str) -> str: + try: + async with s3_session.client("s3") as s3: + resp = await s3.get_object(Bucket=bucket, Key=key) + content = (await resp["Body"].read()).decode("utf-8") + except Exception as e: + response = getattr(e, "response", None) + error_code = ( + response.get("Error", {}).get("Code") + if isinstance(response, dict) + else getattr(e, "error_code", None) + ) + if error_code in ("NoSuchKey", "404"): + raise S3KeyNotFoundError(f"S3 key not found: s3://{bucket}/{key}") from e + raise S3ReadError(f"Failed to read s3://{bucket}/{key}: {e}") from e + + if not content.strip(): + raise EmptyDocumentError(f"S3 object is empty: s3://{bucket}/{key}") + return content + + +async def upload_tree_to_s3(key: str, data: dict, s3_session, bucket: str) -> None: + try: + async with s3_session.client("s3") as s3: + await s3.put_object( + Bucket=bucket, + Key=key, + Body=json.dumps(data, indent=2, ensure_ascii=False), + ContentType="application/json", + ) + except Exception as e: + raise S3WriteError(f"Failed to write output to s3://{bucket}/{key}: {e}") from e + + +# ── Markdown → page_list ────────────────────────────────────────────────────── + +def markdown_to_page_list(content: str, tokens_per_page: int, model: str) -> list[tuple[str, int]]: + """Split markdown text into virtual pages for the pipeline. + + When the content contains ... tags (produced + by the OCR-to-markdown converter), each tag block becomes exactly one virtual + page. This keeps physical_index values accurate to real document page numbers. + + Falls back to token-budget chunking for plain markdown without page tags. + tokens_per_page is unused in the tagged path. + """ + tag_matches = _PAGE_TAG_RE.findall(content) + if tag_matches: + return [(page, count_tokens(page, model)) for page in tag_matches if page.strip()] + + lines = content.split("\n") + pages: list[tuple[str, int]] = [] + current_lines: list[str] = [] + current_tokens = 0 + + for line in lines: + line_tokens = count_tokens(line, model) if line.strip() else 1 + if current_tokens + line_tokens > tokens_per_page and current_lines: + chunk = "\n".join(current_lines) + pages.append((chunk, count_tokens(chunk, model))) + current_lines = [] + current_tokens = 0 + current_lines.append(line) + current_tokens += line_tokens + + if current_lines: + chunk = "\n".join(current_lines) + pages.append((chunk, count_tokens(chunk, model))) + + return pages + + +async def _build_tree(page_list: list[tuple[str, int]], opt) -> tuple[list, str]: + """Run the full PDF pipeline on virtual markdown pages. + + Calls meta_processor(mode='process_no_toc') directly — skipping check_toc() + — then follows the same post-processing steps as page_index_main(): + preface, boundary detection, tree build, large-node recursion, optional + summaries/description. Wrapped in asyncio.run() so it can be called from + asyncio.to_thread() without touching the server's event loop. + """ + logger.info({"total_page_number": len(page_list)}) + logger.info({"total_token": sum(t for _, t in page_list)}) + + # Directly enter process_no_toc — no TOC scanning for markdown + flat_toc = await meta_processor( + page_list, mode="process_no_toc", start_index=1, opt=opt, logger=logger + ) + + flat_toc = add_preface_if_needed(flat_toc) + flat_toc = await check_title_appearance_in_start_concurrent( + flat_toc, page_list, model=opt.model, logger=logger + ) + valid_items = [item for item in flat_toc if item.get("physical_index") is not None] + + tree = post_processing(valid_items, len(page_list)) + await asyncio.gather( + *[process_large_node_recursively(node, page_list, opt, logger=logger) for node in tree] + ) + + if opt.if_add_node_id == "yes": + write_node_id(tree) + if opt.if_add_node_text == "yes" or opt.if_add_node_summary == "yes": + add_node_text(tree, page_list) + description = "" + if opt.if_add_node_summary == "yes": + await generate_summaries_for_structure(tree, model=opt.model) + if opt.if_add_node_text == "no": + remove_structure_text(tree) + # Matches original page_index_builder(): description is only generated + # when summaries are also enabled (nested inside the summary block). + if opt.if_add_doc_description == "yes": + clean = create_clean_structure_for_description(tree) + description = await agenerate_doc_description(clean, model=opt.model) + + tree = format_structure( + tree, + order=["title", "node_id", "start_index", "end_index", "summary", "text", "nodes"], + ) + return tree, description + + +# ── Public orchestrator ─────────────────────────────────────────────────────── + +def _build_config_overrides(payload) -> dict: + """Build config overrides with three-tier priority. + + Priority (highest → lowest): + 1. Request fields — explicit per-call values + 2. PAGEINDEX_* env vars — deploy-time defaults + 3. pageindex/config.yaml — library defaults (handled by ConfigLoader) + """ + _ENV_MAP = { + "model": "PAGEINDEX_MODEL", + "if_add_node_id": "PAGEINDEX_IF_ADD_NODE_ID", + "if_add_node_summary": "PAGEINDEX_IF_ADD_NODE_SUMMARY", + "if_add_node_text": "PAGEINDEX_IF_ADD_NODE_TEXT", + "if_add_doc_description": "PAGEINDEX_IF_ADD_DOC_DESCRIPTION", + } + # Start with env vars as base layer + overrides = {key: os.environ[env] for key, env in _ENV_MAP.items() if env in os.environ} + + # Overlay request fields (non-None values take precedence over env vars) + request_fields = { + "model": payload.model, + "if_add_node_id": payload.if_add_node_id, + "if_add_node_summary": payload.if_add_node_summary, + "if_add_node_text": payload.if_add_node_text, + "if_add_doc_description": payload.if_add_doc_description, + **(payload.extra_config or {}), + } + overrides.update({k: v for k, v in request_fields.items() if v is not None}) + + return overrides + + +async def process_markdown(payload, s3_session, bucket: str) -> dict: + content = await read_markdown_from_s3(payload.input_s3_key, s3_session, bucket) + + opt = ConfigLoader().load(_build_config_overrides(payload)) + page_list = markdown_to_page_list(content, payload.tokens_per_page, opt.model) + + tree, description = await _build_tree(page_list, opt) + + output = { + "doc_description": description, + "structure": tree, + } + await upload_tree_to_s3(payload.output_s3_key, output, s3_session, bucket) + + return {**output, "output_s3_key": payload.output_s3_key} diff --git a/main_api.py b/main_api.py new file mode 100644 index 000000000..b52405ee8 --- /dev/null +++ b/main_api.py @@ -0,0 +1,7 @@ +from dotenv import load_dotenv +import uvicorn + +load_dotenv() + +if __name__ == "__main__": + uvicorn.run("api.server:app", host="0.0.0.0", port=8080, reload=True) diff --git a/pageindex/page_index.py b/pageindex/page_index.py index 9004309fb..ba4ba9937 100644 --- a/pageindex/page_index.py +++ b/pageindex/page_index.py @@ -504,7 +504,7 @@ def remove_first_physical_index_section(text): return text ### add verify completeness -def generate_toc_continue(toc_content, part, model=None): +async def generate_toc_continue(toc_content, part, model=None): print('start generate_toc_continue') prompt = """ You are an expert in extracting hierarchical tree structure. @@ -532,14 +532,14 @@ def generate_toc_continue(toc_content, part, model=None): Directly return the additional part of the final JSON structure. Do not output anything else.""" prompt = prompt + '\nGiven text\n:' + part + '\nPrevious tree structure\n:' + json.dumps(toc_content, indent=2) - response, finish_reason = llm_completion(model=model, prompt=prompt, return_finish_reason=True) + response, finish_reason = await llm_acompletion(model=model, prompt=prompt, return_finish_reason=True) if finish_reason == 'finished': return extract_json(response) else: raise Exception(f'finish reason: {finish_reason}') ### add verify completeness -def generate_toc_init(part, model=None): +async def generate_toc_init(part, model=None): print('start generate_toc_init') prompt = """ You are an expert in extracting hierarchical tree structure, your task is to generate the tree structure of the document. @@ -566,14 +566,14 @@ def generate_toc_init(part, model=None): Directly return the final JSON structure. Do not output anything else.""" prompt = prompt + '\nGiven text\n:' + part - response, finish_reason = llm_completion(model=model, prompt=prompt, return_finish_reason=True) + response, finish_reason = await llm_acompletion(model=model, prompt=prompt, return_finish_reason=True) if finish_reason == 'finished': return extract_json(response) else: raise Exception(f'finish reason: {finish_reason}') -def process_no_toc(page_list, start_index=1, model=None, logger=None): +async def process_no_toc(page_list, start_index=1, model=None, logger=None): page_contents=[] token_lengths=[] for page_index in range(start_index, start_index+len(page_list)): @@ -583,9 +583,9 @@ def process_no_toc(page_list, start_index=1, model=None, logger=None): group_texts = page_list_to_group_text(page_contents, token_lengths) logger.info(f'len(group_texts): {len(group_texts)}') - toc_with_page_number= generate_toc_init(group_texts[0], model) + toc_with_page_number= await generate_toc_init(group_texts[0], model) for group_text in group_texts[1:]: - toc_with_page_number_additional = generate_toc_continue(toc_with_page_number, group_text, model) + toc_with_page_number_additional = await generate_toc_continue(toc_with_page_number, group_text, model) toc_with_page_number.extend(toc_with_page_number_additional) logger.info(f'generate_toc: {toc_with_page_number}') @@ -965,7 +965,7 @@ async def meta_processor(page_list, mode=None, toc_content=None, toc_page_list=N elif mode == 'process_toc_no_page_numbers': toc_with_page_number = process_toc_no_page_numbers(toc_content, toc_page_list, page_list, model=opt.model, logger=logger) else: - toc_with_page_number = process_no_toc(page_list, start_index=start_index, model=opt.model, logger=logger) + toc_with_page_number = await process_no_toc(page_list, start_index=start_index, model=opt.model, logger=logger) toc_with_page_number = [item for item in toc_with_page_number if item.get('physical_index') is not None] diff --git a/pageindex/utils.py b/pageindex/utils.py index f00ccf3a7..09f7884db 100644 --- a/pageindex/utils.py +++ b/pageindex/utils.py @@ -59,7 +59,7 @@ def llm_completion(model, prompt, chat_history=None, return_finish_reason=False) -async def llm_acompletion(model, prompt): +async def llm_acompletion(model, prompt, return_finish_reason=False): if model: model = model.removeprefix("litellm/") max_retries = 10 @@ -71,7 +71,11 @@ async def llm_acompletion(model, prompt): messages=messages, temperature=0, ) - return response.choices[0].message.content + content = response.choices[0].message.content + if return_finish_reason: + finish_reason = "max_output_reached" if response.choices[0].finish_reason == "length" else "finished" + return content, finish_reason + return content except Exception as e: print('************* Retrying *************') logging.error(f"Error: {e}") @@ -79,6 +83,8 @@ async def llm_acompletion(model, prompt): await asyncio.sleep(1) else: logging.error('Max retries reached for prompt: ' + prompt) + if return_finish_reason: + return "", "error" return "" @@ -618,8 +624,7 @@ def create_clean_structure_for_description(structure): else: return structure - -def generate_doc_description(structure, model=None): +def get_generate_doc_description_prompt(structure, model=None): prompt = f"""Your are an expert in generating descriptions for a document. You are given a structure of a document. Your task is to generate a one-sentence description for the document, which makes it easy to distinguish the document from other documents. @@ -627,9 +632,17 @@ def generate_doc_description(structure, model=None): Directly return the description, do not include any other text. """ + return prompt + +def generate_doc_description(structure, model=None): + prompt = get_generate_doc_description_prompt(structure, model) response = llm_completion(model, prompt) return response +async def agenerate_doc_description(structure, model=None): + prompt = get_generate_doc_description_prompt(structure, model) + response = await llm_acompletion(model, prompt) + return response def reorder_dict(data, key_order): if not key_order: diff --git a/requirements.txt b/requirements.txt index e6ad80531..003f25c8d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,10 @@ litellm==1.83.7 # openai-agents # optional: required for examples/agentic_vectorless_rag_demo.py pymupdf==1.26.4 PyPDF2==3.0.1 -python-dotenv==1.2.2 +python-dotenv>=1.0.1 pyyaml==6.0.2 + +# API server +fastapi>=0.111.0 +uvicorn[standard]>=0.29.0 +aioboto3>=13.0.0