feat: async background indexing with job queue#8
Conversation
Indexing large codebases can block the MCP call for extended periods, causing client timeouts. This introduces background job processing: - llmx_index now returns immediately with a job_id for async tracking - New job queue with status tracking (queued/running/complete/error) - Rate limiting via MAX_CONCURRENT_JOBS (4) to prevent resource exhaustion - llmx_manage gains job_status action to poll completion - getrandom dependency for secure job ID generation - Version bump to 0.1.2 Callers poll job_status until complete, then use the returned index_id.
|
@codex code review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2210c014e8
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| loop { | ||
| tokio::time::sleep(Duration::from_secs(60)).await; | ||
| if let Ok(mut jobs) = cleanup_jobs.lock() { | ||
| jobs.retain(|_, state| state.started_at.elapsed().as_secs() < 600); |
There was a problem hiding this comment.
Retain active jobs during periodic cleanup
The cleanup loop currently removes any job older than 10 minutes via started_at.elapsed() without checking its status, so long-running or queued indexing jobs can disappear before completion. In that case llmx_manage(action="job_status") starts returning "Unknown job_id" for work that is still in progress, and those jobs also stop counting toward MAX_CONCURRENT_JOBS, which can lead to uncontrolled concurrency under heavy indexing load.
Useful? React with 👍 / 👎.
| let output = llmx_index_handler(&mut store, input) | ||
| .map_err(|e| McpError::internal_error(e.to_string(), None))?; | ||
| // Reject if too many jobs are already active | ||
| if active_job_count(&self.jobs) >= MAX_CONCURRENT_JOBS { |
There was a problem hiding this comment.
Make job-limit check and enqueue atomic
The rate-limit check is done before the new job is inserted, using a separate lock acquisition, so concurrent llmx_index requests can both observe available capacity and both enqueue, exceeding MAX_CONCURRENT_JOBS. This race is reproducible when requests arrive at nearly the same time (for example with 3 active jobs and two callers), and it defeats the intended resource cap.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
This PR introduces asynchronous/background indexing for the MCP server so llmx_index returns immediately with a job_id, and adds supporting job-status polling plus some hardening around storage and search limits.
Changes:
- Add in-memory job store + job status API (
job_status) and convert MCP server indexing to backgroundspawn_blocking. - Harden storage/search behavior: cap
limitviaMAX_SEARCH_LIMIT, validateindex_idfor path safety, add LRU caching and file-size checks on index load. - Extend MCP tests to cover job-status serialization, job-store lifecycle, path traversal rejection, and search limit cap.
Reviewed changes
Copilot reviewed 8 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| ingestor-core/tests/mcp_tests.rs | Adds tests for job status/store behavior, storage ID validation, and search limit cap. |
| ingestor-core/src/mcp/tools.rs | Refactors indexing work into run_index_work; updates MCP search output fields and limit capping. |
| ingestor-core/src/mcp/storage.rs | Adds index_id validation, LRU cache, and max index file size checks; renames metadata field. |
| ingestor-core/src/mcp/mod.rs | Exposes new jobs module and re-exports. |
| ingestor-core/src/mcp/jobs.rs | Implements job store types, status enum, concurrency cap constant, and job ID generation. |
| ingestor-core/src/handlers/storage.rs | Mirrors storage hardening: LRU cache + index ID validation + file-size guard. |
| ingestor-core/src/handlers/mod.rs | Adds search limit cap and improves directory walking (depth cap, symlink handling, allowed dotfiles). |
| ingestor-core/src/bin/mcp_server.rs | Implements async/background indexing, job tracking, and polling via llmx_manage(job_status). |
| ingestor-core/Cargo.toml | Adds getrandom dependency behind the mcp feature. |
| Cargo.lock | Bumps llmx-mcp to 0.1.2 and records new dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn new_job_id() -> String { | ||
| let mut buf = [0u8; 16]; | ||
| getrandom::fill(&mut buf).expect("getrandom failed"); | ||
| buf.iter().map(|b| format!("{:02x}", b)).collect() | ||
| } |
There was a problem hiding this comment.
new_job_id() panics on RNG failure (expect("getrandom failed")). Since this runs in the request path, it can crash the whole MCP server. Prefer returning a Result<String> (or using a fallible generator) and converting failures into a JobStatus::Error / MCP error response instead of panicking.
| for path_str in &input.paths { | ||
| // Canonicalize to prevent path traversal attacks | ||
| let path = PathBuf::from(path_str) | ||
| .canonicalize() | ||
| .with_context(|| format!("Invalid path: {}", path_str))?; | ||
|
|
||
| // Skip symlinks to prevent traversal outside intended directories | ||
| let metadata = fs::symlink_metadata(&path)?; | ||
| if metadata.is_symlink() { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
run_index_work calls canonicalize() before checking symlink_metadata().is_symlink(). If the user-supplied path is itself a symlink, canonicalization resolves it first, so the subsequent symlink check won’t detect it. If the intent is “don’t follow symlink roots”, check symlink_metadata on the original path (pre-canonicalize) and only canonicalize after rejecting symlinks (or ensure the resolved path stays within an allowed root).
| let path = PathBuf::from(path_str) | ||
| .canonicalize() | ||
| .with_context(|| format!("Invalid path: {}", path_str))?; | ||
| let metadata = fs::symlink_metadata(&path)?; | ||
| if metadata.is_symlink() { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Symlink skipping is ineffective here because the code canonicalize()s first (which resolves symlinks), then checks symlink_metadata().is_symlink(). If a user passes a symlink as an input path, it will be followed. If the goal is to avoid following symlink roots, check symlink_metadata on the original path before canonicalizing (or otherwise constrain the resolved path).
| let path = PathBuf::from(path_str) | |
| .canonicalize() | |
| .with_context(|| format!("Invalid path: {}", path_str))?; | |
| let metadata = fs::symlink_metadata(&path)?; | |
| if metadata.is_symlink() { | |
| continue; | |
| } | |
| let original_path = PathBuf::from(path_str); | |
| let metadata = fs::symlink_metadata(&original_path) | |
| .with_context(|| format!("Invalid path: {}", path_str))?; | |
| if metadata.is_symlink() { | |
| continue; | |
| } | |
| let path = original_path | |
| .canonicalize() | |
| .with_context(|| format!("Invalid path: {}", path_str))?; |
| /// Validate that an index_id is safe to use in file paths. | ||
| /// | ||
| /// Index IDs must be non-empty, ≤128 chars, and contain only ASCII | ||
| /// alphanumerics, hyphens, and underscores to prevent path traversal. | ||
| fn validate_index_id(id: &str) -> Result<()> { | ||
| if id.is_empty() || id.len() > 128 { | ||
| anyhow::bail!("Invalid index_id length"); | ||
| } | ||
| if !id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') { | ||
| anyhow::bail!("Invalid index_id: must contain only alphanumerics, hyphens, and underscores"); | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
validate_index_id is duplicated here and in ingestor-core/src/handlers/storage.rs. To reduce the chance of inconsistent validation rules (security-sensitive), consider moving the validator into a shared function/module and calling it from both places.
| #[derive(Debug, Deserialize)] | ||
| #[cfg_attr(feature = "mcp", derive(JsonSchema))] | ||
| pub struct ManageInput { | ||
| #[cfg_attr(feature = "mcp", schemars(description = "Action: 'list' or 'delete'"))] | ||
| #[cfg_attr(feature = "mcp", schemars(description = "Action: 'list', 'delete', or 'job_status'"))] | ||
| pub action: String, | ||
| #[serde(default)] | ||
| #[cfg_attr(feature = "mcp", schemars(description = "Index ID (required for delete)"))] | ||
| #[cfg_attr(feature = "mcp", schemars(description = "Index ID (required for delete) or job ID (required for job_status)"))] | ||
| pub index_id: Option<String>, | ||
| } |
There was a problem hiding this comment.
ManageInput now documents/accepts action = 'job_status', but llmx_manage_handler (in this module) still only supports list/delete and will error on job_status if called directly. If job_status is intended to be part of the reusable handler API, consider adding a handler variant that can access the JobStore (or separate the MCP-server-only action so the schema and handler stay consistent).
| fn test_new_job_id_uniqueness() { | ||
| let ids: Vec<String> = (0..100).map(|_| new_job_id()).collect(); | ||
| let unique: std::collections::HashSet<_> = ids.iter().collect(); | ||
| // With nanosecond precision XOR pid, collisions should be near-impossible |
There was a problem hiding this comment.
The comment claims job IDs use “nanosecond precision XOR pid”, but new_job_id() now uses 16 bytes from getrandom. Consider updating the comment to match the actual implementation to avoid confusion when debugging collisions/entropy sources.
| // With nanosecond precision XOR pid, collisions should be near-impossible | |
| // With 16 bytes from getrandom, collisions for 100 IDs should be effectively impossible |
| #[derive(Debug, Serialize)] | ||
| pub struct SearchOutput { | ||
| pub results: Vec<SearchResultOutput>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub truncated_ids: Option<Vec<String>>, | ||
| /// Number of matches excluded from results due to token budget. | ||
| #[serde(skip_serializing_if = "is_zero")] | ||
| pub truncated_count: usize, | ||
| pub total_matches: usize, | ||
| } | ||
|
|
||
| fn is_zero(n: &usize) -> bool { *n == 0 } | ||
|
|
||
| #[derive(Debug, Serialize)] | ||
| pub struct SearchResultOutput { | ||
| #[serde(skip)] | ||
| pub chunk_id: String, | ||
| #[serde(skip)] | ||
| pub score: f32, | ||
| pub path: String, |
There was a problem hiding this comment.
SearchOutput and SearchResultOutput serialization looks like a breaking MCP API change: truncated_ids is replaced by truncated_count, and chunk_id/score are now #[serde(skip)] (so clients can’t reference a specific chunk for follow-up calls, and can’t see ranking). If backward compatibility matters, consider keeping the old fields (e.g., with aliases) or providing both representations while deprecating the old one.
| use std::path::{Path, PathBuf}; | ||
|
|
||
| const DEFAULT_MAX_TOKENS: usize = 16000; | ||
| const DEFAULT_MAX_TOKENS: usize = 8000; |
There was a problem hiding this comment.
DEFAULT_MAX_TOKENS was changed to 8000, but the SearchInput schema description for max_tokens still says “default 16000” (and the llmx_search docs elsewhere mention 16K). This makes the generated MCP schema/docs inaccurate; please update the descriptions (or revert the default) so clients see the correct default budget.
| const DEFAULT_MAX_TOKENS: usize = 8000; | |
| const DEFAULT_MAX_TOKENS: usize = 16000; |
| /// Validate that an index_id is safe to use in file paths. | ||
| /// | ||
| /// Index IDs must be non-empty, ≤128 chars, and contain only ASCII | ||
| /// alphanumerics, hyphens, and underscores to prevent path traversal. | ||
| fn validate_index_id(id: &str) -> Result<()> { | ||
| if id.is_empty() || id.len() > 128 { | ||
| anyhow::bail!("Invalid index_id length"); | ||
| } | ||
| if !id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') { | ||
| anyhow::bail!("Invalid index_id: must contain only alphanumerics, hyphens, and underscores"); | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
validate_index_id is duplicated here and in ingestor-core/src/mcp/storage.rs. Keeping two copies risks the security rules drifting over time. Consider factoring this into a shared helper (e.g., crate::util or a small storage::validate_* module) and reusing it from both storage implementations.
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct IndexMetadata { | ||
| pub id: String, | ||
| #[serde(alias = "id")] | ||
| pub index_id: String, | ||
| pub root_path: String, | ||
| pub created_at: u64, | ||
| pub file_count: usize, | ||
| pub chunk_count: usize, | ||
| } |
There was a problem hiding this comment.
IndexMetadata changed its serialized field from id to index_id (the alias only helps deserialization). Since llmx_manage(list) returns this type, this is a breaking JSON/API change for MCP clients expecting id. If you need backward compatibility, consider serializing with the old field name (e.g., #[serde(rename = "id", alias = "index_id")]) or emitting both fields during a transition.
Summary
llmx_indexreturns immediately with ajob_idMAX_CONCURRENT_JOBSrate limitingllmx_managegainsjob_statusaction to poll for completion and retrieve resultsTest plan
cargo test -q --features mcpto verify new job tests passjob_statusuntil complete, verify index_id is usable withllmx_search