feat(ingestion): async background processing with progressive UI#7
feat(ingestion): async background processing with progressive UI#7jeremylongshore merged 3 commits intomainfrom
Conversation
…back
Replace inline HTTP-blocking ingestion with async background pipeline:
- POST /trigger/ingestion returns 202 Accepted in <1s with run_id
- GET /trigger/ingestion/{run_id} for polling run status from Firestore
- Background task updates Firestore doc through phases progressively
- Idempotency guard prevents concurrent runs (409), auto-cleans stale >10min
- Dashboard polls every 3s with phase labels, stuck detection, auto-refresh
- logging.py upgraded from Phase 4 stubs to real Firestore writes
- Cloud Run deploy gets --no-cpu-throttling and --cpu-boost flags
- Composite Firestore index for active run query
- 27 new tests (unit + integration), debug runbook in 000-docs/
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds an asynchronous ingestion trigger: POST returns 202 with a run_id and poll_url, a background multi‑phase ingestion pipeline updates run state in Firestore, GET /ingestion/{run_id} reports phase/stats/success, frontend polls and reacts to completion; logging, indexes, tests, docs, and a new RSS feed were also added. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant Frontend as IngestionButton
participant Backend as MCP_Service
participant Firestore as Firestore_DB
participant Pipeline as Ingestion_Pipeline
User->>Frontend: Click "Run Ingestion"
Frontend->>Backend: POST /trigger/ingestion
Backend->>Firestore: check_active_run()
Firestore-->>Backend: active run? (no/yes)
alt no active run
Backend->>Firestore: create ingestion_runs/{run_id} (status=accepted)
Backend->>Pipeline: start async run_ingestion_pipeline(run_id)
Backend-->>Frontend: 202 Accepted + run_id + poll_url
Frontend->>Frontend: start polling GET /ingestion/{run_id}
Pipeline->>Firestore: update phases and stats (loading_sources → fetching_feeds → storing_articles → done)
Frontend->>Backend: GET /ingestion/{run_id}
Backend->>Firestore: read ingestion_runs/{run_id}
Firestore-->>Backend: current run state
Backend-->>Frontend: IngestionRunStatus (is_successful computed)
Frontend->>Frontend: stop polling, handleComplete(), emit ingestion-complete
Frontend->>Frontend: SystemActivityCard receives ingestion-complete -> refresh runs
else active run
Backend-->>Frontend: 409 Conflict (active_run_id)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
Summary of ChangesHello @jeremylongshore, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the ingestion pipeline by introducing asynchronous background processing, providing a more responsive user experience. It integrates real-time status updates via Firestore, improves system reliability with idempotency checks and stale run cleanup, and adds comprehensive documentation for debugging. These changes ensure that ingestion operations are non-blocking, transparent, and easier to monitor and troubleshoot. Highlights
Changelog
Ignored Files
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a significant improvement, introducing asynchronous background processing for the ingestion pipeline. This greatly enhances user experience by providing immediate feedback and progressive updates. The backend changes, including the idempotency guard and stale run cleanup, add crucial reliability. My review highlights a few critical and high-severity issues: an architectural concern with duplicated database logic in the backend, a potential memory leak on the frontend, and a bug in the stale run cleanup logic. Addressing these will make the implementation more robust and maintainable.
|
|
||
| const pollStatus = useCallback( | ||
| (runId: string) => { | ||
| pollRef.current = setInterval(async () => { |
There was a problem hiding this comment.
The setInterval created for polling is not cleared if the component unmounts while a poll is in progress (e.g., if the user navigates away). This will cause a memory leak and may lead to React attempting to update state on an unmounted component. You should use a useEffect hook to call stopPolling during cleanup.
useEffect(() => {
return () => {
stopPolling();
};
}, [stopPolling]);This should be added inside the IngestionButton component.
| async def run_ingestion_pipeline( | ||
| run_id: str, | ||
| request: TriggerIngestionRequest, | ||
| db: firestore.Client, | ||
| ) -> None: | ||
| """Run the full ingestion pipeline as a background task. | ||
|
|
||
| Updates Firestore doc progressively through phases: | ||
| loading_sources → fetching_feeds → storing_articles → upserting_authors → done | ||
| """ |
There was a problem hiding this comment.
This run_ingestion_pipeline function implements its own logic for writing to Firestore, and the _get_db helper is duplicated from routers/logging.py. However, a dedicated /mcp/tools/log_ingestion_run tool exists for this purpose. To maintain a clear separation of concerns and adhere to the MCP architecture where orchestrators use tools, this trigger router should call the logging tool instead of directly accessing the database. This will centralize the database logic, improve modularity, and reduce code duplication.
| if hasattr(started_at, "timestamp"): | ||
| age_seconds = time.time() - started_at.timestamp() | ||
| else: | ||
| age_seconds = 0 |
There was a problem hiding this comment.
The else: age_seconds = 0 branch is a potential bug. If started_at is not a valid datetime object for some reason (e.g., data corruption), this code will silently treat the run as brand new, preventing it from ever being cleaned up as stale. This could block all future ingestion runs. It would be more robust to log an error and treat the run as stale to avoid a deadlock.
| if hasattr(started_at, "timestamp"): | |
| age_seconds = time.time() - started_at.timestamp() | |
| else: | |
| age_seconds = 0 | |
| if hasattr(started_at, "timestamp"): | |
| age_seconds = time.time() - started_at.timestamp() | |
| else: | |
| logger.error(f"Invalid startedAt type for run {doc.id}: {type(started_at)}") | |
| age_seconds = 601 # Mark as stale to be safe |
| - **Cause**: POST failed (network, CORS, Cloud Run cold start timeout) | ||
| - **Debug**: | ||
| ```bash | ||
| curl -X POST https://perception-mcp-w53xszfqnq-uc.a.run.app/trigger/ingestion \ |
There was a problem hiding this comment.
The curl commands in this document use a hardcoded URL (https://perception-mcp-w53xszfqnq-uc.a.run.app). It's a better practice to use a placeholder like <SERVICE_URL> to make the documentation more generic and reusable across different environments (e.g., staging, local development).
| curl -X POST https://perception-mcp-w53xszfqnq-uc.a.run.app/trigger/ingestion \ | |
| curl -X POST <SERVICE_URL>/trigger/ingestion \ |
|
|
||
| const pollStatus = useCallback( | ||
| (runId: string) => { | ||
| pollRef.current = setInterval(async () => { |
There was a problem hiding this comment.
Using setInterval for polling can be problematic if a network request takes longer than the interval, as new requests can stack up. A more robust pattern is to use a recursive setTimeout, which ensures that the next poll is only scheduled after the current one has completed or failed.
Example:
const poll = async () => {
// ... fetch logic ...
// If not in a terminal state, schedule the next poll
if (shouldContinuePolling) {
setTimeout(poll, POLL_INTERVAL_MS);
}
};
poll();| const fetchRuns = async () => { | ||
| try { | ||
| const runsRef = collection(db, 'ingestion_runs') | ||
| const q = query(runsRef, orderBy('startedAt', 'desc'), limit(10)) | ||
| const snapshot = await getDocs(q) | ||
|
|
||
| const runsList = snapshot.docs.map((doc) => ({ | ||
| id: doc.id, | ||
| ...doc.data() | ||
| })) as IngestionRun[] | ||
|
|
||
| setRuns(runsList) | ||
| } catch (err) { | ||
| console.error('Error fetching ingestion runs:', err) | ||
| setError(err instanceof Error ? err.message : 'Failed to load activity') | ||
| } finally { | ||
| setLoading(false) | ||
| } | ||
| } | ||
|
|
||
| useEffect(() => { | ||
| fetchRuns() | ||
| }, []) | ||
|
|
||
| // Auto-refresh when ingestion completes | ||
| useEffect(() => { | ||
| const handler = () => fetchRuns() | ||
| window.addEventListener('ingestion-complete', handler) | ||
| return () => window.removeEventListener('ingestion-complete', handler) | ||
| }, []) |
There was a problem hiding this comment.
The fetchRuns function is redefined on every render. This can lead to stale closures in the useEffect that listens for the ingestion-complete event and can cause unnecessary re-executions of effects. To ensure correctness and improve performance, you should wrap fetchRuns in useCallback and add it as a dependency to the useEffect hooks that use it.
const fetchRuns = useCallback(async () => {
try {
const runsRef = collection(db, 'ingestion_runs')
const q = query(runsRef, orderBy('startedAt', 'desc'), limit(10))
const snapshot = await getDocs(q)
const runsList = snapshot.docs.map((doc) => ({
id: doc.id,
...doc.data()
})) as IngestionRun[]
setRuns(runsList)
} catch (err) {
console.error('Error fetching ingestion runs:', err)
setError(err instanceof Error ? err.message : 'Failed to load activity')
} finally {
setLoading(false)
}
}, [])
useEffect(() => {
fetchRuns()
}, [fetchRuns])
// Auto-refresh when ingestion completes
useEffect(() => {
const handler = () => fetchRuns()
window.addEventListener('ingestion-complete', handler)
return () => window.removeEventListener('ingestion-complete', handler)
}, [fetchRuns])
tests/mcp/test_trigger_router.py
Outdated
| class TestStorageIdempotency: | ||
| """Tests for deterministic document IDs.""" | ||
|
|
||
| def test_same_url_same_hash(self): | ||
| """Same URL should produce same hash for deduplication.""" | ||
| import hashlib | ||
|
|
||
| url = "https://example.com/article/123" | ||
| hash1 = hashlib.sha256(url.encode()).hexdigest()[:16] | ||
| hash2 = hashlib.sha256(url.encode()).hexdigest()[:16] | ||
| assert hash1 == hash2 | ||
|
|
||
| def test_different_urls_different_hashes(self): | ||
| """Different URLs should produce different hashes.""" | ||
| import hashlib | ||
|
|
||
| url1 = "https://example.com/article/123" | ||
| url2 = "https://example.com/article/456" | ||
| hash1 = hashlib.sha256(url1.encode()).hexdigest()[:16] | ||
| hash2 = hashlib.sha256(url2.encode()).hexdigest()[:16] | ||
| assert hash1 != hash2 |
There was a problem hiding this comment.
The TestStorageIdempotency class appears to be testing the behavior of Python's standard hashlib library, rather than any application code within the trigger router. Tests should focus on your application's logic. This test class doesn't add value and should be removed to reduce noise in the test suite.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@000-docs/ingestion-run-button-debug.md`:
- Around line 5-20: The fenced code block in the markdown snippet (the
triple-backtick block showing the Ingestion flow) is missing a language tag
which triggers MD040; update the opening fence from ``` to ```text so the block
becomes a fenced "text" code block, ensuring tools/markdownlint no longer flag
it and preserving the exact content inside the block (refer to the fenced block
that contains "Dashboard (IngestionButton.tsx)" and the POST/Poll/On completion
lines).
In `@dashboard/src/components/IngestionButton.tsx`:
- Around line 1-41: The component IngestionButton currently sets up polling
using pollRef but never clears it on unmount; add a useEffect in the
IngestionButton component that returns a cleanup function which calls
stopPolling() (and optionally clears startTimeRef) to ensure clearInterval is
called and pollRef.current is nulled when the component unmounts, referencing
pollRef and the existing stopPolling callback so the interval cannot fire after
unmount.
In `@dashboard/src/components/SystemActivityCard.tsx`:
- Around line 60-77: The fetchRuns function doesn't clear the previous error
when a refresh succeeds, so after a successful fetch the component may still
show the error state; update fetchRuns to reset the error on success by calling
setError(null) (or empty string depending on your error state type) immediately
after setRuns (or before setLoading(false)), ensuring the state variable
referenced by error is cleared on successful fetch.
In `@tests/api/test_trigger_integration.py`:
- Around line 43-52: The fixture client currently yields TestClient(app)
directly which skips FastAPI lifespan/startup/shutdown events; update the pytest
fixture (the client fixture that calls _mock_firestore and patches
routers.trigger._get_db) to instantiate TestClient(app) inside a with context
(e.g., with TestClient(app) as client:) and yield that client so the context
manager runs lifespan cleanup; apply the same change to the other similar
fixture/blocks around lines 54-78 that create TestClient without a with
statement.
🧹 Nitpick comments (2)
dashboard/src/components/IngestionButton.tsx (1)
7-25: AlignRunPhasewith backend phases.
PHASE_LABELSincludesacceptedandinitializing, and the backend can emit those values; the union should include them for type safety.♻️ Suggested fix
type RunPhase = | 'idle' | 'starting' + | 'accepted' + | 'initializing' | 'loading_sources' | 'fetching_feeds' | 'storing_articles' | 'upserting_authors' | 'done'perception_app/mcp_service/routers/logging.py (1)
81-108: Usefirestore.AsyncClientto avoid blocking the async event loop.The handler is async (line 63) but uses the synchronous
firestore.Clientfor I/O at line 107 (doc_ref.set()), which blocks the event loop under load. Replace withfirestore.AsyncClientandawaitthe Firestore operations, or useawait run_in_threadpool(doc_ref.set, ...)if the client must remain synchronous.
- Fix memory leak: add useEffect cleanup to stop polling on unmount - Fix stale run deadlock: invalid startedAt now defaults to stale (601s) instead of age_seconds=0 which blocked all future runs - Switch from setInterval to recursive setTimeout to prevent stacked polls - Align RunPhase type with backend phases (add accepted, initializing) - Wrap fetchRuns in useCallback, add fetchRuns to useEffect deps - Clear error state on successful refresh in SystemActivityCard - Use TestClient context manager for proper lifespan cleanup in tests - Remove TestStorageIdempotency (tested stdlib hashlib, not app code) - Add text language tag to fenced code block (MD040) - Replace hardcoded service URL with <SERVICE_URL> placeholder in runbook Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@perception_app/mcp_service/config/rss_sources.yaml`:
- Around line 148-152: Remove the duplicate RSS entry that points to
"https://blog.langchain.dev/rss/" (the older redirect) and keep the existing
"LangChain Blog" entry that uses "https://blog.langchain.com/rss/"; locate the
duplicate by matching the name or URL ("LangChain Blog" /
"https://blog.langchain.dev/rss/") in the rss_sources.yaml and delete that block
so only the canonical "https://blog.langchain.com/rss/" entry remains active.
| - name: "LangChain Blog" | ||
| url: "https://blog.langchain.com/rss/" | ||
| category: "ai" | ||
| active: true | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check if both LangChain RSS URLs are accessible and compare their content
echo "=== Checking existing LangChain feed (blog.langchain.dev) ==="
curl -sI "https://blog.langchain.dev/rss/" | head -10
echo ""
echo "=== Checking new LangChain Blog feed (blog.langchain.com) ==="
curl -sI "https://blog.langchain.com/rss/" | head -10
echo ""
echo "=== Comparing first few article titles from each feed ==="
echo "--- blog.langchain.dev ---"
curl -s "https://blog.langchain.dev/rss/" 2>/dev/null | grep -oP '(?<=<title>).*?(?=</title>)' | head -5
echo ""
echo "--- blog.langchain.com ---"
curl -s "https://blog.langchain.com/rss/" 2>/dev/null | grep -oP '(?<=<title>).*?(?=</title>)' | head -5Repository: intent-solutions-io/perception-with-intent
Length of output: 1114
🏁 Script executed:
#!/bin/bash
# Check if the YAML file exists and verify lines 502-505
echo "=== Checking YAML file for lines 502-505 ==="
if [ -f "perception_app/mcp_service/config/rss_sources.yaml" ]; then
sed -n '500,510p' perception_app/mcp_service/config/rss_sources.yaml
else
echo "File not found at expected path"
fi
echo ""
echo "=== Following redirect from blog.langchain.dev/rss/ ==="
curl -sL -I "https://blog.langchain.dev/rss/" 2>/dev/null | head -15
echo ""
echo "=== Getting actual content from blog.langchain.dev following redirects ==="
curl -sL "https://blog.langchain.dev/rss/" 2>/dev/null | grep -oP '(?<=<title>).*?(?=</title>)' | head -5Repository: intent-solutions-io/perception-with-intent
Length of output: 1157
Remove the duplicate "LangChain" entry at lines 502-505.
The entry at lines 148-152 (https://blog.langchain.com/rss/) and the entry at lines 502-505 (https://blog.langchain.dev/rss/) serve identical RSS content. The blog.langchain.dev URL redirects to blog.langchain.com, making it an outdated reference to the same feed. Keep the new entry at lines 148-152 and remove the older one at lines 502-505 to prevent duplicate article fetches.
🤖 Prompt for AI Agents
In `@perception_app/mcp_service/config/rss_sources.yaml` around lines 148 - 152,
Remove the duplicate RSS entry that points to "https://blog.langchain.dev/rss/"
(the older redirect) and keep the existing "LangChain Blog" entry that uses
"https://blog.langchain.com/rss/"; locate the duplicate by matching the name or
URL ("LangChain Blog" / "https://blog.langchain.dev/rss/") in the
rss_sources.yaml and delete that block so only the canonical
"https://blog.langchain.com/rss/" entry remains active.
Summary
asyncio.create_task()runs the full pipeline and updates Firestore progressively through phases (loading_sources → fetching_feeds → storing_articles → upserting_authors → done).evaluate_run_success()scoring terminal runs.CustomEvent("ingestion-complete")for SystemActivityCard auto-refresh.ingestion_runswrites withset(merge=True).--no-cpu-throttlingand--cpu-boostflags to support background task execution.[status ASC, startedAt DESC]active run query.000-docs/ingestion-run-button-debug.mdwith failure taxonomy, diagnostic queries, and SLO targets.Test plan
pytest tests/mcp/ tests/api/ -v --no-cov— 78 tests pass (27 new + 51 existing)black --checkon modified Python files — cleantsc --noEmiton dashboard — compiles cleanly--no-cpu-throttlingcurl -X POST .../trigger/ingestionreturns 202 in <1s with run_idcurl .../trigger/ingestion/{run_id}returns progressive statusingestion_runscollection has docs with status transitions🤖 Generated with Claude Code
Summary by CodeRabbit