Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize parallel file processing in folder scan and fix dead lock problem in Gunicorn multi workers mode #1145

Merged
merged 7 commits into from
Mar 21, 2025
3 changes: 3 additions & 0 deletions lightrag/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import jwt
from fastapi import HTTPException, status
from pydantic import BaseModel
from dotenv import load_dotenv

load_dotenv()


class TokenPayload(BaseModel):
Expand Down
4 changes: 3 additions & 1 deletion lightrag/api/gunicorn_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
worker_class = "uvicorn.workers.UvicornWorker"

# Other Gunicorn configurations
timeout = int(os.getenv("TIMEOUT", 150)) # Default 150s to match run_with_gunicorn.py
timeout = int(
os.getenv("TIMEOUT", 150 * 2)
) # Default 150s *2 to match run_with_gunicorn.py
keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s

# Logging configuration
Expand Down
2 changes: 1 addition & 1 deletion lightrag/api/lightrag_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
# Load environment variables
# Updated to use the .env that is inside the current folder
# This update allows the user to put a different.env file for each lightrag folder
load_dotenv(".env", override=True)
load_dotenv()

# Initialize config parser
config = configparser.ConfigParser()
Expand Down
42 changes: 31 additions & 11 deletions lightrag/api/routers/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path):


async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
"""Index multiple files concurrently
"""Index multiple files sequentially to avoid high CPU load

Args:
rag: LightRAG instance
Expand All @@ -416,12 +416,12 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
try:
enqueued = False

if len(file_paths) == 1:
enqueued = await pipeline_enqueue_file(rag, file_paths[0])
else:
tasks = [pipeline_enqueue_file(rag, path) for path in file_paths]
enqueued = any(await asyncio.gather(*tasks))
# Process files sequentially
for file_path in file_paths:
if await pipeline_enqueue_file(rag, file_path):
enqueued = True

# Process the queue only if at least one file was successfully enqueued
if enqueued:
await rag.apipeline_process_enqueue_documents()
except Exception as e:
Expand Down Expand Up @@ -472,14 +472,34 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
total_files = len(new_files)
logger.info(f"Found {total_files} new files to index.")

for idx, file_path in enumerate(new_files):
try:
await pipeline_index_file(rag, file_path)
except Exception as e:
logger.error(f"Error indexing file {file_path}: {str(e)}")
if not new_files:
return

# Get MAX_PARALLEL_INSERT from global_args
max_parallel = global_args["max_parallel_insert"]
# Calculate batch size as 2 * MAX_PARALLEL_INSERT
batch_size = 2 * max_parallel

# Process files in batches
for i in range(0, total_files, batch_size):
batch_files = new_files[i : i + batch_size]
batch_num = i // batch_size + 1
total_batches = (total_files + batch_size - 1) // batch_size

logger.info(
f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files"
)
await pipeline_index_files(rag, batch_files)

# Log progress
processed = min(i + batch_size, total_files)
logger.info(
f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)"
)

except Exception as e:
logger.error(f"Error during scanning process: {str(e)}")
logger.error(traceback.format_exc())


def create_document_routes(
Expand Down
4 changes: 2 additions & 2 deletions lightrag/api/run_with_gunicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# Updated to use the .env that is inside the current folder
# This update allows the user to put a different.env file for each lightrag folder
load_dotenv(".env")
load_dotenv()


def check_and_install_dependencies():
Expand Down Expand Up @@ -140,7 +140,7 @@ def load_config(self):

# Timeout configuration prioritizes command line arguments
gunicorn_config.timeout = (
args.timeout if args.timeout else int(os.getenv("TIMEOUT", 150))
args.timeout if args.timeout * 2 else int(os.getenv("TIMEOUT", 150 * 2))
)

# Keepalive configuration
Expand Down
19 changes: 12 additions & 7 deletions lightrag/api/utils_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .auth import auth_handler

# Load environment variables
load_dotenv(override=True)
load_dotenv()

global_args = {"main_args": None}

Expand Down Expand Up @@ -365,6 +365,9 @@ def timeout_type(value):
"LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE
)

# Get MAX_PARALLEL_INSERT from environment
global_args["max_parallel_insert"] = get_env_value("MAX_PARALLEL_INSERT", 2, int)

# Handle openai-ollama special case
if args.llm_binding == "openai-ollama":
args.llm_binding = "openai"
Expand Down Expand Up @@ -441,8 +444,8 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.log_level}")
ASCIIColors.white(" ├─ Verbose Debug: ", end="")
ASCIIColors.yellow(f"{args.verbose}")
ASCIIColors.white(" ├─ Timeout: ", end="")
ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}")
ASCIIColors.white(" ├─ History Turns: ", end="")
ASCIIColors.yellow(f"{args.history_turns}")
ASCIIColors.white(" └─ API Key: ", end="")
ASCIIColors.yellow("Set" if args.key else "Not Set")

Expand All @@ -459,8 +462,10 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.llm_binding}")
ASCIIColors.white(" ├─ Host: ", end="")
ASCIIColors.yellow(f"{args.llm_binding_host}")
ASCIIColors.white(" ─ Model: ", end="")
ASCIIColors.white(" ─ Model: ", end="")
ASCIIColors.yellow(f"{args.llm_model}")
ASCIIColors.white(" └─ Timeout: ", end="")
ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}")

# Embedding Configuration
ASCIIColors.magenta("\n📊 Embedding Configuration:")
Expand All @@ -475,8 +480,10 @@ def display_splash_screen(args: argparse.Namespace) -> None:

# RAG Configuration
ASCIIColors.magenta("\n⚙️ RAG Configuration:")
ASCIIColors.white(" ├─ Max Async Operations: ", end="")
ASCIIColors.white(" ├─ Max Async for LLM: ", end="")
ASCIIColors.yellow(f"{args.max_async}")
ASCIIColors.white(" ├─ Max Parallel Insert: ", end="")
ASCIIColors.yellow(f"{global_args['max_parallel_insert']}")
ASCIIColors.white(" ├─ Max Tokens: ", end="")
ASCIIColors.yellow(f"{args.max_tokens}")
ASCIIColors.white(" ├─ Max Embed Tokens: ", end="")
Expand All @@ -485,8 +492,6 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.chunk_size}")
ASCIIColors.white(" ├─ Chunk Overlap Size: ", end="")
ASCIIColors.yellow(f"{args.chunk_overlap_size}")
ASCIIColors.white(" ├─ History Turns: ", end="")
ASCIIColors.yellow(f"{args.history_turns}")
ASCIIColors.white(" ├─ Cosine Threshold: ", end="")
ASCIIColors.yellow(f"{args.cosine_threshold}")
ASCIIColors.white(" ├─ Top-K: ", end="")
Expand Down
Loading
Loading