diff --git a/libs/core/kiln_ai/utils/pdf_utils.py b/libs/core/kiln_ai/utils/pdf_utils.py index a29e51d0e..dda2e6f06 100644 --- a/libs/core/kiln_ai/utils/pdf_utils.py +++ b/libs/core/kiln_ai/utils/pdf_utils.py @@ -4,8 +4,9 @@ import asyncio import atexit +import logging import tempfile -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import BrokenExecutor, ProcessPoolExecutor from contextlib import asynccontextmanager from pathlib import Path from typing import AsyncGenerator @@ -13,9 +14,22 @@ import pypdfium2 from pypdf import PdfReader, PdfWriter +logger = logging.getLogger(__name__) + _pdf_conversion_executor: ProcessPoolExecutor | None = None +def _reset_pdf_conversion_executor(): + """Reset the PDF conversion executor, shutting down the old one if it exists.""" + global _pdf_conversion_executor + if _pdf_conversion_executor is not None: + try: + _pdf_conversion_executor.shutdown(wait=False) + except Exception: + pass + _pdf_conversion_executor = None + + # Lazy load for speed, singleton so dev-server reloading doesn't recreate the executor def get_pdf_conversion_executor() -> ProcessPoolExecutor: global _pdf_conversion_executor @@ -72,13 +86,35 @@ def _convert_pdf_to_images_sync(pdf_path: Path, output_dir: Path) -> list[Path]: async def convert_pdf_to_images(pdf_path: Path, output_dir: Path) -> list[Path]: loop = asyncio.get_running_loop() - result = await loop.run_in_executor( - get_pdf_conversion_executor(), - _convert_pdf_to_images_sync, - pdf_path, - output_dir, - ) - return result + executor = get_pdf_conversion_executor() + try: + result = await loop.run_in_executor( + executor, + _convert_pdf_to_images_sync, + pdf_path, + output_dir, + ) + return result + except (BrokenExecutor, RuntimeError) as e: + error_msg = str(e).lower() + if ( + "process pool" in error_msg + or "child process" in error_msg + or isinstance(e, BrokenExecutor) + ): + logger.warning( + f"Process pool became unusable, recreating executor. Error: {e}" + ) + _reset_pdf_conversion_executor() + executor = get_pdf_conversion_executor() + result = await loop.run_in_executor( + executor, + _convert_pdf_to_images_sync, + pdf_path, + output_dir, + ) + return result + raise def _shutdown_pdf_conversion_executor():