diff --git a/.github/workflows/deploy-mcp.yml b/.github/workflows/deploy-mcp.yml index 57e2d65..45d1090 100644 --- a/.github/workflows/deploy-mcp.yml +++ b/.github/workflows/deploy-mcp.yml @@ -75,6 +75,8 @@ jobs: --cpu 1 \ --min-instances 0 \ --max-instances 10 \ + --no-cpu-throttling \ + --cpu-boost \ --timeout 300 \ --port 8080 \ --set-env-vars "ENVIRONMENT=${{ github.event.inputs.environment }},MCP_SERVICE_NAME=${{ env.SERVICE_NAME }}" \ diff --git a/000-docs/ingestion-run-button-debug.md b/000-docs/ingestion-run-button-debug.md new file mode 100644 index 0000000..8ed998d --- /dev/null +++ b/000-docs/ingestion-run-button-debug.md @@ -0,0 +1,149 @@ +# Ingestion Run Button - Debug Runbook + +## System Map + +```text +Dashboard (IngestionButton.tsx) + │ + ├── POST /trigger/ingestion → 202 Accepted {run_id, poll_url} + │ └── Background: run_ingestion_pipeline() + │ ├── Phase: loading_sources (rss_sources.yaml) + │ ├── Phase: fetching_feeds (128x /mcp/tools/fetch_rss_feed) + │ ├── Phase: storing_articles (batches → /mcp/tools/store_articles) + │ └── Phase: upserting_authors (/mcp/tools/upsert_author) + │ + ├── Poll: GET /trigger/ingestion/{run_id} (every 3s) + │ └── Reads Firestore ingestion_runs/{run_id} + │ + └── On completion: CustomEvent('ingestion-complete') + └── SystemActivityCard auto-refreshes +``` + +## Failure Taxonomy + +### F1: No toast appears after clicking Run Ingestion +- **Cause**: POST failed (network, CORS, Cloud Run cold start timeout) +- **Debug**: + ```bash + curl -X POST /trigger/ingestion \ + -H "Content-Type: application/json" \ + -d '{"trigger":"manual"}' + ``` +- **Fix**: Check Cloud Run logs, verify service is deployed, check CORS config + +### F2: "Already in progress" when nothing is running +- **Cause**: Previous run stuck in `accepted` or `running` status +- **Debug**: + ```bash + # Check Firestore for stuck runs + gcloud firestore documents list \ + --project=perception-with-intent \ + --database=perception-db \ + --collection=ingestion_runs \ + --filter='status="running"' + ``` +- **Fix**: Stale runs (>10 min) are auto-cleaned. Manual fix: update status to `failed` in Firestore console + +### F3: Stuck forever (spinner never stops) +- **Cause**: Background task crashed, Firestore never updated to terminal state +- **Debug**: Check Cloud Run logs for unhandled exceptions + ```bash + gcloud logging read 'resource.type="cloud_run_revision" AND resource.labels.service_name="perception-mcp" AND severity>=ERROR' \ + --project=perception-with-intent --limit=20 --format=json + ``` +- **Fix**: Dashboard has 5-minute stuck detection. Add catch-all in `run_ingestion_pipeline` + +### F4: No data in SystemActivityCard +- **Cause**: Firestore docs not being written, or Firebase client SDK can't read them +- **Debug**: + ```bash + # Check if docs exist + gcloud firestore documents list \ + --project=perception-with-intent \ + --database=perception-db \ + --collection=ingestion_runs --limit=5 + ``` +- **Fix**: Verify `ingestion_runs` collection exists, check Firestore security rules + +### F5: High error count in completed run +- **Cause**: RSS feeds returning errors (timeouts, 403s, DNS failures) +- **Debug**: Check `errors` array in Firestore doc +- **Fix**: Review error patterns, update/remove broken feeds from `rss_sources.yaml` + +### F6: 0 articles stored +- **Cause**: All feeds failed, or `store_articles` endpoint broken +- **Debug**: + ```bash + curl -X POST /mcp/tools/store_articles \ + -H "Content-Type: application/json" \ + -d '{"run_id":"test","articles":[{"title":"t","url":"https://x.com","source_id":"s","published_at":"2025-01-01"}]}' + ``` +- **Fix**: Check Firestore permissions for MCP service account + +### F7: Cold start timeout on POST +- **Cause**: Cloud Run container starting up, POST times out before ready +- **Debug**: Check Cloud Run startup latency in metrics +- **Fix**: `--cpu-boost` flag enables faster cold starts; consider `--min-instances 1` + +## Key Diagnostic Queries + +### Cloud Logging +```bash +# All ingestion run logs +gcloud logging read 'resource.type="cloud_run_revision" AND jsonPayload.router="trigger"' \ + --project=perception-with-intent --limit=50 + +# Specific run +gcloud logging read 'resource.type="cloud_run_revision" AND jsonPayload.run_id="run-XXXXX"' \ + --project=perception-with-intent --limit=50 + +# Errors only +gcloud logging read 'resource.type="cloud_run_revision" AND severity>=ERROR AND resource.labels.service_name="perception-mcp"' \ + --project=perception-with-intent --limit=20 +``` + +### Firestore +```bash +# List recent runs +gcloud firestore documents list \ + --project=perception-with-intent \ + --database=perception-db \ + --collection=ingestion_runs --limit=10 + +# Get specific run +gcloud firestore documents get \ + --project=perception-with-intent \ + --database=perception-db \ + ingestion_runs/run-XXXXX +``` + +### curl Commands +```bash +# Health check +curl /health + +# Trigger ingestion +curl -X POST /trigger/ingestion \ + -H "Content-Type: application/json" \ + -d '{"trigger":"manual","time_window_hours":24,"max_items_per_source":50}' + +# Check run status +curl /trigger/ingestion/run-XXXXX +``` + +## SLO Targets + +| Metric | Target | Alert Threshold | +|--------|--------|-----------------| +| POST response time | < 1s | > 3s | +| End-to-end success rate | > 95% | < 90% | +| Source error rate | < 10% | > 25% | +| Total duration | < 300s | > 600s | +| Articles stored per run | > 100 | < 10 | + +## Architecture Notes + +- Background task uses `asyncio.create_task()` - requires `--no-cpu-throttling` on Cloud Run +- `--cpu-boost` gives extra CPU during cold starts +- Idempotency guard prevents concurrent runs (10 min stale timeout) +- Firestore composite index on `[status ASC, startedAt DESC]` for active run query diff --git a/dashboard/src/components/IngestionButton.tsx b/dashboard/src/components/IngestionButton.tsx index 3fba2d8..1667b8b 100644 --- a/dashboard/src/components/IngestionButton.tsx +++ b/dashboard/src/components/IngestionButton.tsx @@ -1,14 +1,153 @@ -import { useState } from 'react' +import { useState, useRef, useCallback, useEffect } from 'react' import { toast } from 'sonner' import { Button } from '@/components/ui/button' const MCP_URL = 'https://perception-mcp-w53xszfqnq-uc.a.run.app' +type RunPhase = + | 'idle' + | 'starting' + | 'accepted' + | 'initializing' + | 'loading_sources' + | 'fetching_feeds' + | 'storing_articles' + | 'upserting_authors' + | 'done' + +const PHASE_LABELS: Record = { + starting: 'Starting...', + initializing: 'Starting...', + accepted: 'Starting...', + loading_sources: 'Loading sources...', + fetching_feeds: 'Fetching feeds...', + storing_articles: 'Storing articles...', + upserting_authors: 'Upserting authors...', + done: 'Finishing up...', +} + +const POLL_INTERVAL_MS = 3000 +const STUCK_TIMEOUT_MS = 5 * 60 * 1000 // 5 minutes + export default function IngestionButton() { - const [ingesting, setIngesting] = useState(false) + const [phase, setPhase] = useState('idle') + const [stats, setStats] = useState | null>(null) + const pollRef = useRef | null>(null) + const startTimeRef = useRef(0) + + const stopPolling = useCallback(() => { + if (pollRef.current) { + clearTimeout(pollRef.current) + pollRef.current = null + } + }, []) + + // Cleanup on unmount + useEffect(() => { + return () => stopPolling() + }, [stopPolling]) + + const handleComplete = useCallback( + (data: { + status: string + stats?: Record + errors?: Array<{ message: string }> + is_successful?: boolean + }) => { + stopPolling() + setPhase('idle') + setStats(null) + + const stored = data.stats?.articlesStored || 0 + const sources = data.stats?.sourcesChecked || 0 + const errorCount = data.errors?.length || 0 + + if (data.status === 'failed') { + toast.error('Ingestion failed', { + description: data.errors?.[0]?.message || 'Check logs for details', + }) + } else if (errorCount > 0) { + toast.success( + `Stored ${stored} articles from ${sources} sources`, + { + description: `${errorCount} source${errorCount > 1 ? 's' : ''} had errors`, + } + ) + } else { + toast.success( + `Stored ${stored} articles from ${sources} sources`, + { + description: `Completed successfully`, + } + ) + } + + // Notify SystemActivityCard to refresh + window.dispatchEvent(new CustomEvent('ingestion-complete')) + }, + [stopPolling] + ) + + const pollStatus = useCallback( + (runId: string) => { + const poll = async () => { + // Stuck run detection + if (Date.now() - startTimeRef.current > STUCK_TIMEOUT_MS) { + stopPolling() + setPhase('idle') + setStats(null) + toast.warning('Ingestion may be stuck', { + description: 'The run has been active for over 5 minutes. Check Cloud Run logs.', + }) + return + } + + try { + const res = await fetch(`${MCP_URL}/trigger/ingestion/${runId}`) + if (!res.ok) { + // Retry on next poll + pollRef.current = setTimeout(poll, POLL_INTERVAL_MS) + return + } + + const data = await res.json() + + // Update phase display + if (data.phase && data.phase !== 'done') { + setPhase(data.phase as RunPhase) + } + + // Update stats for display + if (data.stats) { + setStats(data.stats) + } + + // Check for terminal state + if ( + data.status === 'completed' || + data.status === 'completed_with_errors' || + data.status === 'failed' + ) { + handleComplete(data) + return + } + } catch { + // Network error - keep polling, might recover + } + + // Schedule next poll only after current one completes + pollRef.current = setTimeout(poll, POLL_INTERVAL_MS) + } + + // Start first poll after delay + pollRef.current = setTimeout(poll, POLL_INTERVAL_MS) + }, + [stopPolling, handleComplete] + ) const handleRunIngestion = async () => { - setIngesting(true) + setPhase('starting') + startTimeRef.current = Date.now() try { const response = await fetch(`${MCP_URL}/trigger/ingestion`, { @@ -21,53 +160,84 @@ export default function IngestionButton() { }), }) - if (response.ok) { + if (response.status === 409) { const data = await response.json() - const stored = data.articles_stored || 0 - const sources = data.sources_processed || 0 - const duration = data.duration_seconds || 0 - const errorCount = data.errors?.length || 0 + setPhase('idle') + toast.warning('Ingestion already in progress', { + description: `Run ${data.detail?.active_run_id || ''} is still active. Wait for it to finish.`, + }) + return + } - if (errorCount > 0) { - toast.success(`Stored ${stored} articles from ${sources} sources (${duration.toFixed(1)}s)`, { - description: `${errorCount} source${errorCount > 1 ? 's' : ''} had errors`, - }) - } else { - toast.success(`Stored ${stored} articles from ${sources} sources`, { - description: `Completed in ${duration.toFixed(1)} seconds`, - }) - } - } else { - toast.warning('Ingestion service unavailable', { - description: 'The MCP service may be restarting. Try again in a moment.', + if (response.status === 202) { + const data = await response.json() + toast.info('Ingestion started', { + description: `Run ID: ${data.run_id}`, }) + pollStatus(data.run_id) + return } + + // Unexpected status + setPhase('idle') + toast.warning('Ingestion service unavailable', { + description: 'The MCP service may be restarting. Try again in a moment.', + }) } catch { + setPhase('idle') toast.error('Failed to connect to ingestion service', { description: 'Check your network connection and try again.', }) - } finally { - setIngesting(false) } } + const isRunning = phase !== 'idle' + const phaseLabel = PHASE_LABELS[phase] || 'Processing...' + return ( - + {isRunning && stats && ( + + {stats.articlesFetched !== undefined && + `${stats.articlesFetched} articles fetched`} + {stats.articlesStored !== undefined && + stats.articlesStored > 0 && + ` · ${stats.articlesStored} stored`} + )} - + ) } diff --git a/dashboard/src/components/SystemActivityCard.tsx b/dashboard/src/components/SystemActivityCard.tsx index 591d857..5451a77 100644 --- a/dashboard/src/components/SystemActivityCard.tsx +++ b/dashboard/src/components/SystemActivityCard.tsx @@ -1,4 +1,4 @@ -import { useEffect, useState } from 'react' +import { useEffect, useState, useCallback } from 'react' import { collection, query, orderBy, limit, getDocs } from 'firebase/firestore' import { db } from '../firebase' import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card' @@ -57,29 +57,37 @@ export default function SystemActivityCard() { const [loading, setLoading] = useState(true) const [error, setError] = useState(null) - useEffect(() => { - const fetchRuns = async () => { - try { - const runsRef = collection(db, 'ingestion_runs') - const q = query(runsRef, orderBy('startedAt', 'desc'), limit(10)) - const snapshot = await getDocs(q) - - const runsList = snapshot.docs.map((doc) => ({ - id: doc.id, - ...doc.data() - })) as IngestionRun[] - - setRuns(runsList) - } catch (err) { - console.error('Error fetching ingestion runs:', err) - setError(err instanceof Error ? err.message : 'Failed to load activity') - } finally { - setLoading(false) - } + const fetchRuns = useCallback(async () => { + try { + const runsRef = collection(db, 'ingestion_runs') + const q = query(runsRef, orderBy('startedAt', 'desc'), limit(10)) + const snapshot = await getDocs(q) + + setError(null) + const runsList = snapshot.docs.map((doc) => ({ + id: doc.id, + ...doc.data() + })) as IngestionRun[] + + setRuns(runsList) + } catch (err) { + console.error('Error fetching ingestion runs:', err) + setError(err instanceof Error ? err.message : 'Failed to load activity') + } finally { + setLoading(false) } + }, []) + useEffect(() => { fetchRuns() - }, []) + }, [fetchRuns]) + + // Auto-refresh when ingestion completes + useEffect(() => { + const handler = () => fetchRuns() + window.addEventListener('ingestion-complete', handler) + return () => window.removeEventListener('ingestion-complete', handler) + }, [fetchRuns]) if (loading) { return diff --git a/firestore.indexes.json b/firestore.indexes.json index 61d30b2..83b7957 100644 --- a/firestore.indexes.json +++ b/firestore.indexes.json @@ -42,6 +42,21 @@ } ] } + , + { + "collectionGroup": "ingestion_runs", + "queryScope": "COLLECTION", + "fields": [ + { + "fieldPath": "status", + "order": "ASCENDING" + }, + { + "fieldPath": "startedAt", + "order": "DESCENDING" + } + ] + } ], "fieldOverrides": [] } diff --git a/perception_app/mcp_service/config/rss_sources.yaml b/perception_app/mcp_service/config/rss_sources.yaml index c267d67..011100b 100644 --- a/perception_app/mcp_service/config/rss_sources.yaml +++ b/perception_app/mcp_service/config/rss_sources.yaml @@ -1,6 +1,6 @@ # RSS Feed Sources for Perception News Intelligence # Generated: 2025-12-29 -# Total: 128 verified active feeds across 22 categories +# Total: 129 verified active feeds across 22 categories # Source: Comprehensive feed testing + GitHub RSS collections + Brex Top 100 SaaS sources: @@ -129,7 +129,7 @@ sources: category: "heavy_equipment" active: true - # ===== AI/ML RESEARCH (3 feeds) ===== + # ===== AI/ML RESEARCH (4 feeds) ===== - name: "MIT AI News" url: "https://news.mit.edu/rss/topic/artificial-intelligence2" category: "ai" @@ -145,6 +145,11 @@ sources: category: "ai" active: true + - name: "LangChain Blog" + url: "https://blog.langchain.com/rss/" + category: "ai" + active: true + # ===== TECH NEWS (10 feeds) ===== - name: "TechCrunch" url: "https://techcrunch.com/feed/" diff --git a/perception_app/mcp_service/main.py b/perception_app/mcp_service/main.py index 738fddd..e56d3a1 100644 --- a/perception_app/mcp_service/main.py +++ b/perception_app/mcp_service/main.py @@ -87,7 +87,7 @@ async def root(): "/mcp/tools/log_ingestion_run", "/mcp/tools/send_notification", ], - "triggers": ["/trigger/ingestion"], + "triggers": ["/trigger/ingestion", "/trigger/ingestion/{run_id}"], } diff --git a/perception_app/mcp_service/routers/logging.py b/perception_app/mcp_service/routers/logging.py index fb6db11..c3b48d8 100644 --- a/perception_app/mcp_service/routers/logging.py +++ b/perception_app/mcp_service/routers/logging.py @@ -2,25 +2,36 @@ Logging Tool Router Handles ingestion run logging to Firestore. - -Phase 4: Returns fake but structurally correct responses. -Phase 5: Wire up real Firestore /ingestion_runs writes. +Writes real documents to ingestion_runs collection. """ import logging as python_logging import json from datetime import datetime, timezone from typing import List, Dict, Any, Optional -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field +from google.cloud import firestore logger = python_logging.getLogger(__name__) router = APIRouter() +# Firestore client (lazy init) +_db: Optional[firestore.Client] = None + + +def _get_db() -> firestore.Client: + """Get or create Firestore client.""" + global _db + if _db is None: + _db = firestore.Client(project="perception-with-intent", database="perception-db") + return _db + # Pydantic Models class IngestionStats(BaseModel): """Statistics for an ingestion run.""" + sources_checked: int articles_fetched: int articles_stored: int @@ -31,6 +42,7 @@ class IngestionStats(BaseModel): class LogIngestionRunRequest(BaseModel): """Request schema for log_ingestion_run tool.""" + run_id: str = Field(..., description="Ingestion run ID") status: str = Field(..., description="Run status: running | completed | failed") stats: IngestionStats @@ -40,6 +52,7 @@ class LogIngestionRunRequest(BaseModel): class LogIngestionRunResponse(BaseModel): """Response schema for log_ingestion_run tool.""" + run_id: str logged_at: str firestore_path: str @@ -51,34 +64,76 @@ async def log_ingestion_run(request: LogIngestionRunRequest): """ Create or update an ingestion run record in Firestore. - Phase 4: Returns fake data. - Phase 5 TODO: - - Initialize Firestore client - - Write/update to /ingestion_runs/{run_id} - - Store all stats and timestamps - - Handle status transitions (running → completed/failed) + Writes to ingestion_runs/{run_id} with merge semantics. """ - logger.info(json.dumps({ - "severity": "INFO", - "message": "Logging ingestion run", - "mcp_tool": "log_ingestion_run", - "run_id": request.run_id, - "status": request.status - })) - - # PHASE 4: Fake response - response = LogIngestionRunResponse( - run_id=request.run_id, - logged_at=datetime.now(tz=timezone.utc).isoformat(), - firestore_path=f"/ingestion_runs/{request.run_id}" + logger.info( + json.dumps( + { + "severity": "INFO", + "message": "Logging ingestion run", + "mcp_tool": "log_ingestion_run", + "run_id": request.run_id, + "status": request.status, + } + ) ) - logger.info(json.dumps({ - "severity": "INFO", - "message": "Ingestion run logged successfully", - "mcp_tool": "log_ingestion_run", - "run_id": request.run_id, - "status": request.status - })) + db = _get_db() + doc_ref = db.collection("ingestion_runs").document(request.run_id) + + doc_data = { + "status": request.status, + "startedAt": datetime.fromisoformat(request.started_at), + "stats": { + "sourcesChecked": request.stats.sources_checked, + "articlesFetched": request.stats.articles_fetched, + "articlesStored": request.stats.articles_stored, + "articlesIngested": request.stats.articles_stored, + "articlesDeduplicated": request.stats.duplicates_skipped, + "briefGenerated": request.stats.brief_generated, + "errors": request.stats.errors, + }, + "updatedAt": firestore.SERVER_TIMESTAMP, + } + + if request.completed_at: + doc_data["completedAt"] = datetime.fromisoformat(request.completed_at) + # Calculate duration + started = datetime.fromisoformat(request.started_at) + completed = datetime.fromisoformat(request.completed_at) + doc_data["duration"] = round((completed - started).total_seconds(), 2) + + try: + doc_ref.set(doc_data, merge=True) + except Exception as e: + logger.error( + json.dumps( + { + "severity": "ERROR", + "message": f"Failed to write ingestion run to Firestore: {e}", + "mcp_tool": "log_ingestion_run", + "run_id": request.run_id, + } + ) + ) + raise HTTPException(status_code=500, detail=f"Firestore write failed: {e}") + + logged_at = datetime.now(tz=timezone.utc).isoformat() + + logger.info( + json.dumps( + { + "severity": "INFO", + "message": "Ingestion run logged successfully", + "mcp_tool": "log_ingestion_run", + "run_id": request.run_id, + "status": request.status, + } + ) + ) - return response + return LogIngestionRunResponse( + run_id=request.run_id, + logged_at=logged_at, + firestore_path=f"ingestion_runs/{request.run_id}", + ) diff --git a/perception_app/mcp_service/routers/trigger.py b/perception_app/mcp_service/routers/trigger.py index 785a6fc..42c2a9f 100644 --- a/perception_app/mcp_service/routers/trigger.py +++ b/perception_app/mcp_service/routers/trigger.py @@ -3,6 +3,9 @@ Orchestration endpoints for running full ingestion pipelines. Not an MCP tool - this is a higher-level coordinator that calls MCP tools internally. + +POST /ingestion → 202 Accepted, creates Firestore run doc, fires background task +GET /ingestion/{run_id} → reads run status from Firestore """ import asyncio @@ -17,12 +20,27 @@ import yaml import httpx -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field +from google.cloud import firestore logger = logging.getLogger(__name__) router = APIRouter() +# Firestore client (lazy init) +_db: Optional[firestore.Client] = None + + +def _get_db() -> firestore.Client: + """Get or create Firestore client.""" + global _db + if _db is None: + _db = firestore.Client(project="perception-with-intent", database="perception-db") + return _db + + +# --- Request/Response Models --- + class TriggerIngestionRequest(BaseModel): """Request schema for trigger ingestion endpoint.""" @@ -32,17 +50,113 @@ class TriggerIngestionRequest(BaseModel): max_items_per_source: int = Field(50, description="Max articles per source", ge=1, le=500) -class TriggerIngestionResponse(BaseModel): - """Response schema for trigger ingestion endpoint.""" +class TriggerAcceptedResponse(BaseModel): + """Response returned immediately when ingestion is accepted.""" + + run_id: str + status: str = "accepted" + message: str + poll_url: str + + +class IngestionRunStatus(BaseModel): + """Response for GET /ingestion/{run_id}.""" run_id: str status: str - sources_processed: int - articles_fetched: int - articles_stored: int - authors_upserted: int - errors: List[Dict[str, Any]] = Field(default_factory=list) - duration_seconds: float + phase: Optional[str] = None + started_at: Optional[str] = None + completed_at: Optional[str] = None + duration_seconds: Optional[float] = None + stats: Optional[Dict[str, Any]] = None + errors: Optional[List[Dict[str, Any]]] = None + is_successful: Optional[bool] = None + + +# --- Helper Functions --- + + +def evaluate_run_success(run_data: Dict[str, Any]) -> bool: + """Evaluate whether an ingestion run was successful. + + Criteria: + - articles_stored > 0 + - error_rate < 50% (sourcesFailed / sourcesChecked) + - duration < 300s + """ + stats = run_data.get("stats", {}) + articles_stored = stats.get("articlesStored", 0) + sources_checked = stats.get("sourcesChecked", 0) + sources_failed = stats.get("sourcesFailed", 0) + duration = run_data.get("duration", 0) + + if articles_stored <= 0: + return False + + if sources_checked > 0 and (sources_failed / sources_checked) >= 0.5: + return False + + if duration and duration > 300: + return False + + return True + + +async def check_active_run(db: firestore.Client) -> Optional[str]: + """Check if there's an active ingestion run (idempotency guard). + + Returns run_id if an active run exists (blocks new run). + Marks stale runs (>10 min) as failed and allows new run. + """ + runs_ref = db.collection("ingestion_runs") + query = ( + runs_ref.where(filter=firestore.FieldFilter("status", "in", ["accepted", "running"])) + .order_by("startedAt", direction=firestore.Query.DESCENDING) + .limit(1) + ) + + docs = query.stream() + for doc in docs: + data = doc.to_dict() + started_at = data.get("startedAt") + if started_at: + if hasattr(started_at, "timestamp"): + age_seconds = time.time() - started_at.timestamp() + else: + logger.error( + json.dumps( + { + "severity": "ERROR", + "message": f"Invalid startedAt type for run {doc.id}: {type(started_at)}", + "router": "trigger", + } + ) + ) + age_seconds = 601 # Treat as stale to avoid blocking all future runs + + if age_seconds < 600: # 10 minutes + return doc.id + else: + # Stale run - mark as failed + doc.reference.update( + { + "status": "failed", + "phase": "stale_cleanup", + "completedAt": firestore.SERVER_TIMESTAMP, + "errors": [{"message": f"Marked as failed: stale after {int(age_seconds)}s"}], + } + ) + logger.warning( + json.dumps( + { + "severity": "WARNING", + "message": f"Cleaned up stale run {doc.id} after {int(age_seconds)}s", + "router": "trigger", + } + ) + ) + + return None def load_sources() -> List[Dict[str, Any]]: @@ -117,200 +231,376 @@ async def fetch_single_feed( } -@router.post("/ingestion", response_model=TriggerIngestionResponse) -async def trigger_ingestion(request: TriggerIngestionRequest): - """ - Run a full ingestion pipeline across all configured sources. +# --- Background Pipeline --- + + +async def run_ingestion_pipeline( + run_id: str, + request: TriggerIngestionRequest, + db: firestore.Client, +) -> None: + """Run the full ingestion pipeline as a background task. - Loads sources from rss_sources.yaml, fetches each feed, - stores articles to Firestore, and upserts author records. + Updates Firestore doc progressively through phases: + loading_sources → fetching_feeds → storing_articles → upserting_authors → done """ - run_id = f"run-{uuid.uuid4().hex[:12]}" + doc_ref = db.collection("ingestion_runs").document(run_id) start_time = time.time() - logger.info( - json.dumps( + try: + # Phase: loading_sources + doc_ref.update({"status": "running", "phase": "loading_sources"}) + + sources = load_sources() + if not sources: + doc_ref.update( + { + "status": "failed", + "phase": "loading_sources", + "completedAt": firestore.SERVER_TIMESTAMP, + "duration": round(time.time() - start_time, 2), + "errors": [{"message": "No sources loaded from config"}], + } + ) + return + + doc_ref.update( { - "severity": "INFO", - "message": "Starting ingestion run", - "router": "trigger", - "run_id": run_id, - "trigger": request.trigger, - "time_window_hours": request.time_window_hours, - "max_items_per_source": request.max_items_per_source, + "phase": "fetching_feeds", + "stats": {"sourcesChecked": len(sources), "sourcesFailed": 0, "articlesFetched": 0}, } ) - ) - sources = load_sources() - if not sources: - return TriggerIngestionResponse( - run_id=run_id, - status="failed", - sources_processed=0, - articles_fetched=0, - articles_stored=0, - authors_upserted=0, - errors=[{"message": "No sources loaded from config"}], - duration_seconds=time.time() - start_time, - ) + # Phase: fetching_feeds + semaphore = asyncio.Semaphore(10) + errors: List[Dict[str, Any]] = [] + all_articles: List[Dict[str, Any]] = [] + sources_processed = 0 - # Fetch all feeds with concurrency control - semaphore = asyncio.Semaphore(10) - errors: List[Dict[str, Any]] = [] - all_articles: List[Dict[str, Any]] = [] - sources_processed = 0 + async def fetch_with_semaphore(client, source): + async with semaphore: + return await fetch_single_feed( + client, source, request.time_window_hours, request.max_items_per_source, run_id + ) - async def fetch_with_semaphore(client, source): - async with semaphore: - return await fetch_single_feed( - client, source, request.time_window_hours, request.max_items_per_source, run_id - ) + async with httpx.AsyncClient(timeout=60.0) as client: + tasks = [fetch_with_semaphore(client, s) for s in sources] + results = await asyncio.gather(*tasks, return_exceptions=True) - async with httpx.AsyncClient(timeout=60.0) as client: - tasks = [fetch_with_semaphore(client, s) for s in sources] - results = await asyncio.gather(*tasks, return_exceptions=True) + sources_failed = 0 + for result in results: + if isinstance(result, Exception): + errors.append({"message": str(result)}) + sources_failed += 1 + continue - for result in results: - if isinstance(result, Exception): - errors.append({"message": str(result)}) - continue + sources_processed += 1 + source = result["source"] - sources_processed += 1 - source = result["source"] + if "error" in result: + errors.append( + { + "source": source["name"], + "url": source["url"], + "message": result["error"], + } + ) + sources_failed += 1 + continue - if "error" in result: - errors.append( - { - "source": source["name"], - "url": source["url"], - "message": result["error"], - } - ) - continue + for raw_article in result["articles"]: + all_articles.append( + { + "title": raw_article.get("title", "Untitled"), + "url": raw_article.get("url", ""), + "source_id": source["source_id"], + "source_name": source["name"], + "category": source["category"], + "published_at": raw_article.get("published_at", ""), + "summary": raw_article.get("summary"), + "content": raw_article.get("raw_content") or raw_article.get("content_snippet"), + "categories": raw_article.get("categories", []), + } + ) - for raw_article in result["articles"]: - all_articles.append( - { - "title": raw_article.get("title", "Untitled"), - "url": raw_article.get("url", ""), - "source_id": source["source_id"], - "source_name": source["name"], - "category": source["category"], - "published_at": raw_article.get("published_at", ""), - "summary": raw_article.get("summary"), - "content": raw_article.get("raw_content") or raw_article.get("content_snippet"), - "categories": raw_article.get("categories", []), - } - ) + total_fetched = len(all_articles) - total_fetched = len(all_articles) + doc_ref.update( + { + "phase": "storing_articles", + "stats": { + "sourcesChecked": len(sources), + "sourcesFailed": sources_failed, + "articlesFetched": total_fetched, + "articlesStored": 0, + }, + } + ) + + # Phase: storing_articles + articles_stored = 0 + if all_articles: + try: + async with httpx.AsyncClient(timeout=120.0) as client: + batch_size = 200 + for i in range(0, len(all_articles), batch_size): + chunk = all_articles[i : i + batch_size] + store_response = await client.post( + "http://localhost:8080/mcp/tools/store_articles", + json={ + "run_id": run_id, + "articles": chunk, + }, + ) + if store_response.status_code == 200: + store_data = store_response.json() + articles_stored += store_data.get("stored_count", 0) + else: + errors.append( + { + "message": f"store_articles returned {store_response.status_code}", + "batch_start": i, + } + ) + except Exception as e: + errors.append({"message": f"store_articles failed: {e}"}) + + doc_ref.update( + { + "phase": "upserting_authors", + "stats": { + "sourcesChecked": len(sources), + "sourcesFailed": sources_failed, + "articlesFetched": total_fetched, + "articlesStored": articles_stored, + "authorsUpserted": 0, + }, + } + ) + + # Phase: upserting_authors + authors_upserted = 0 + feed_groups: Dict[str, Dict[str, Any]] = {} + for result in results: + if isinstance(result, Exception) or "error" in result: + continue + source = result["source"] + feed_url = source["url"] + if feed_url not in feed_groups: + feed_groups[feed_url] = { + "articles": [], + "feed_metadata": result.get("feed_metadata"), + } + for raw_article in result["articles"]: + feed_groups[feed_url]["articles"].append( + { + "title": raw_article.get("title", "Untitled"), + "url": raw_article.get("url", ""), + "source_id": source["source_id"], + "published_at": raw_article.get("published_at", ""), + "summary": raw_article.get("summary"), + "categories": raw_article.get("categories", []), + } + ) - # Store articles via MCP store_articles endpoint - articles_stored = 0 - if all_articles: try: - async with httpx.AsyncClient(timeout=120.0) as client: - # Send in batches of 200 to avoid oversized payloads - batch_size = 200 - for i in range(0, len(all_articles), batch_size): - chunk = all_articles[i : i + batch_size] - store_response = await client.post( - "http://localhost:8080/mcp/tools/store_articles", - json={ - "run_id": run_id, - "articles": chunk, - }, - ) - if store_response.status_code == 200: - store_data = store_response.json() - articles_stored += store_data.get("stored_count", 0) - else: - errors.append( - { - "message": f"store_articles returned {store_response.status_code}", - "batch_start": i, - } + async with httpx.AsyncClient(timeout=60.0) as client: + for feed_url, group in feed_groups.items(): + if not group["articles"]: + continue + try: + resp = await client.post( + "http://localhost:8080/mcp/tools/upsert_author", + json={ + "feed_url": feed_url, + "articles": group["articles"], + "feed_metadata": group["feed_metadata"], + }, ) + if resp.status_code == 200: + data = resp.json() + if data.get("status") in ("created", "updated"): + authors_upserted += 1 + except Exception as e: + errors.append({"message": f"upsert_author failed for {feed_url}: {e}"}) except Exception as e: - errors.append({"message": f"store_articles failed: {e}"}) - - # Upsert authors (group articles by feed URL) - authors_upserted = 0 - feed_groups: Dict[str, Dict[str, Any]] = {} - for result in results: - if isinstance(result, Exception) or "error" in result: - continue - source = result["source"] - feed_url = source["url"] - if feed_url not in feed_groups: - feed_groups[feed_url] = { - "articles": [], - "feed_metadata": result.get("feed_metadata"), + errors.append({"message": f"Author upsert phase failed: {e}"}) + + # Final update + duration = round(time.time() - start_time, 2) + if errors and articles_stored == 0: + final_status = "failed" + elif errors: + final_status = "completed_with_errors" + else: + final_status = "completed" + + doc_ref.update( + { + "status": final_status, + "phase": "done", + "completedAt": firestore.SERVER_TIMESTAMP, + "duration": duration, + "stats": { + "sourcesChecked": len(sources), + "sourcesFailed": sources_failed, + "articlesFetched": total_fetched, + "articlesStored": articles_stored, + "articlesIngested": articles_stored, + "authorsUpserted": authors_upserted, + }, + "errors": errors[:50], } - for raw_article in result["articles"]: - feed_groups[feed_url]["articles"].append( + ) + + logger.info( + json.dumps( { - "title": raw_article.get("title", "Untitled"), - "url": raw_article.get("url", ""), - "source_id": source["source_id"], - "published_at": raw_article.get("published_at", ""), - "summary": raw_article.get("summary"), - "categories": raw_article.get("categories", []), + "severity": "INFO", + "message": "Ingestion run completed", + "router": "trigger", + "run_id": run_id, + "status": final_status, + "sources_processed": sources_processed, + "articles_fetched": total_fetched, + "articles_stored": articles_stored, + "authors_upserted": authors_upserted, + "error_count": len(errors), + "duration_seconds": duration, } ) + ) - try: - async with httpx.AsyncClient(timeout=60.0) as client: - for feed_url, group in feed_groups.items(): - if not group["articles"]: - continue - try: - resp = await client.post( - "http://localhost:8080/mcp/tools/upsert_author", - json={ - "feed_url": feed_url, - "articles": group["articles"], - "feed_metadata": group["feed_metadata"], - }, - ) - if resp.status_code == 200: - data = resp.json() - if data.get("status") in ("created", "updated"): - authors_upserted += 1 - except Exception as e: - errors.append({"message": f"upsert_author failed for {feed_url}: {e}"}) except Exception as e: - errors.append({"message": f"Author upsert phase failed: {e}"}) + duration = round(time.time() - start_time, 2) + logger.error( + json.dumps( + { + "severity": "ERROR", + "message": f"Ingestion pipeline failed: {e}", + "router": "trigger", + "run_id": run_id, + } + ) + ) + try: + doc_ref.update( + { + "status": "failed", + "completedAt": firestore.SERVER_TIMESTAMP, + "duration": duration, + "errors": [{"message": f"Pipeline exception: {e}"}], + } + ) + except Exception: + pass # If Firestore itself is down, we can't do much - duration = time.time() - start_time - status = "completed" if not errors else "completed_with_errors" + +# --- Endpoints --- + + +@router.post("/ingestion", response_model=TriggerAcceptedResponse, status_code=202) +async def trigger_ingestion(request: TriggerIngestionRequest): + """ + Trigger a full ingestion pipeline across all configured sources. + + Returns 202 Accepted immediately with a run_id. + The pipeline runs in the background, updating Firestore progressively. + Poll GET /trigger/ingestion/{run_id} for status. + """ + db = _get_db() + run_id = f"run-{uuid.uuid4().hex[:12]}" + + # Idempotency guard: reject if another run is active + active_run_id = await check_active_run(db) + if active_run_id: + raise HTTPException( + status_code=409, + detail={ + "message": "An ingestion run is already in progress", + "active_run_id": active_run_id, + "poll_url": f"/trigger/ingestion/{active_run_id}", + }, + ) logger.info( json.dumps( { "severity": "INFO", - "message": "Ingestion run completed", + "message": "Accepting ingestion run", "router": "trigger", "run_id": run_id, - "status": status, - "sources_processed": sources_processed, - "articles_fetched": total_fetched, - "articles_stored": articles_stored, - "authors_upserted": authors_upserted, - "error_count": len(errors), - "duration_seconds": round(duration, 2), + "trigger": request.trigger, } ) ) - return TriggerIngestionResponse( + # Create initial Firestore doc + db.collection("ingestion_runs").document(run_id).set( + { + "status": "accepted", + "phase": "initializing", + "trigger": request.trigger, + "startedAt": firestore.SERVER_TIMESTAMP, + "timeWindowHours": request.time_window_hours, + "maxItemsPerSource": request.max_items_per_source, + "stats": {}, + "errors": [], + } + ) + + # Fire background task + asyncio.create_task(run_ingestion_pipeline(run_id, request, db)) + + return TriggerAcceptedResponse( + run_id=run_id, + status="accepted", + message="Ingestion pipeline started. Poll the poll_url for progress.", + poll_url=f"/trigger/ingestion/{run_id}", + ) + + +@router.get("/ingestion/{run_id}", response_model=IngestionRunStatus) +async def get_ingestion_status(run_id: str): + """ + Get the status of an ingestion run. + + Returns current phase, stats, and completion info. + """ + db = _get_db() + doc_ref = db.collection("ingestion_runs").document(run_id) + doc = doc_ref.get() + + if not doc.exists: + raise HTTPException(status_code=404, detail=f"Ingestion run '{run_id}' not found") + + data = doc.to_dict() + + # Format timestamps + started_at = None + completed_at = None + if data.get("startedAt"): + ts = data["startedAt"] + if hasattr(ts, "isoformat"): + started_at = ts.isoformat() + if data.get("completedAt"): + ts = data["completedAt"] + if hasattr(ts, "isoformat"): + completed_at = ts.isoformat() + + # Evaluate success for terminal states + is_successful = None + if data.get("status") in ("completed", "completed_with_errors", "failed"): + is_successful = evaluate_run_success(data) + + return IngestionRunStatus( run_id=run_id, - status=status, - sources_processed=sources_processed, - articles_fetched=total_fetched, - articles_stored=articles_stored, - authors_upserted=authors_upserted, - errors=errors[:50], # Cap errors to avoid huge responses - duration_seconds=round(duration, 2), + status=data.get("status", "unknown"), + phase=data.get("phase"), + started_at=started_at, + completed_at=completed_at, + duration_seconds=data.get("duration"), + stats=data.get("stats"), + errors=data.get("errors"), + is_successful=is_successful, ) diff --git a/tests/api/test_trigger_integration.py b/tests/api/test_trigger_integration.py new file mode 100644 index 0000000..a12bdf9 --- /dev/null +++ b/tests/api/test_trigger_integration.py @@ -0,0 +1,207 @@ +""" +Trigger Ingestion Integration Tests +==================================== + +Integration tests for the trigger ingestion endpoints. +Uses mocked Firestore to test HTTP-level behavior. +""" + +import pytest +from fastapi.testclient import TestClient +from unittest.mock import patch, MagicMock, PropertyMock +import json +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "perception_app" / "mcp_service")) + + +def _mock_firestore(): + """Create a mock Firestore client with common setup.""" + mock_db = MagicMock() + + # Mock collection → document → set/get/update + mock_doc_ref = MagicMock() + mock_doc_ref.set = MagicMock() + mock_doc_ref.update = MagicMock() + + mock_collection = MagicMock() + mock_collection.document.return_value = mock_doc_ref + + # Mock query for check_active_run (no active runs by default) + mock_query = MagicMock() + mock_query.order_by.return_value = mock_query + mock_query.limit.return_value = mock_query + mock_query.stream.return_value = iter([]) + mock_collection.where.return_value = mock_query + + mock_db.collection.return_value = mock_collection + + return mock_db, mock_doc_ref, mock_collection + + +@pytest.fixture +def client(): + """Create a test client with mocked Firestore.""" + mock_db, _, _ = _mock_firestore() + + with patch("routers.trigger._get_db", return_value=mock_db): + from main import app + + with TestClient(app) as client: + yield client + + +@pytest.fixture +def client_with_active_run(): + """Create a test client where an active run exists.""" + mock_db, _, mock_collection = _mock_firestore() + + # Mock an active run + import time + + mock_active_doc = MagicMock() + mock_active_doc.id = "run-existing123" + mock_active_doc.reference = MagicMock() + started_at_mock = MagicMock() + started_at_mock.timestamp.return_value = time.time() - 30 # 30 seconds ago + mock_active_doc.to_dict.return_value = {"startedAt": started_at_mock} + + mock_query = MagicMock() + mock_query.order_by.return_value = mock_query + mock_query.limit.return_value = mock_query + mock_query.stream.return_value = iter([mock_active_doc]) + mock_collection.where.return_value = mock_query + + with patch("routers.trigger._get_db", return_value=mock_db): + from main import app + + with TestClient(app) as client: + yield client + + +class TestTriggerIngestionEndpoint: + """Tests for POST /trigger/ingestion.""" + + def test_post_returns_202(self, client): + """POST should return 202 Accepted with run_id.""" + response = client.post( + "/trigger/ingestion", + json={"trigger": "manual", "time_window_hours": 24, "max_items_per_source": 50}, + ) + assert response.status_code == 202 + data = response.json() + assert "run_id" in data + assert data["status"] == "accepted" + assert "poll_url" in data + assert data["run_id"] in data["poll_url"] + + def test_post_returns_202_fast(self, client): + """POST should return in well under timeout (not blocking on ingestion).""" + import time + + start = time.time() + response = client.post( + "/trigger/ingestion", + json={"trigger": "manual"}, + ) + elapsed = time.time() - start + assert response.status_code == 202 + # Should return in < 5s (we're not actually running ingestion) + assert elapsed < 5.0 + + def test_post_returns_409_when_active(self, client_with_active_run): + """POST should return 409 when another run is active.""" + response = client_with_active_run.post( + "/trigger/ingestion", + json={"trigger": "manual"}, + ) + assert response.status_code == 409 + data = response.json() + assert "active_run_id" in data["detail"] + + def test_post_validation_error(self, client): + """POST with invalid parameters should return 422.""" + response = client.post( + "/trigger/ingestion", + json={"time_window_hours": 0}, + ) + assert response.status_code == 422 + + +class TestGetIngestionStatus: + """Tests for GET /trigger/ingestion/{run_id}.""" + + def test_get_returns_status(self): + """GET should return run status from Firestore.""" + mock_db, mock_doc_ref, _ = _mock_firestore() + + mock_doc = MagicMock() + mock_doc.exists = True + mock_doc.to_dict.return_value = { + "status": "running", + "phase": "fetching_feeds", + "startedAt": MagicMock(isoformat=lambda: "2025-01-01T00:00:00Z"), + "stats": {"sourcesChecked": 128, "articlesFetched": 50}, + "errors": [], + } + mock_doc_ref.get.return_value = mock_doc + + with patch("routers.trigger._get_db", return_value=mock_db): + from main import app + + with TestClient(app) as client: + response = client.get("/trigger/ingestion/run-test123") + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "running" + assert data["phase"] == "fetching_feeds" + assert data["stats"]["sourcesChecked"] == 128 + + def test_get_returns_404_not_found(self): + """GET should return 404 for nonexistent run.""" + mock_db, mock_doc_ref, _ = _mock_firestore() + + mock_doc = MagicMock() + mock_doc.exists = False + mock_doc_ref.get.return_value = mock_doc + + with patch("routers.trigger._get_db", return_value=mock_db): + from main import app + + with TestClient(app) as client: + response = client.get("/trigger/ingestion/run-doesnotexist") + + assert response.status_code == 404 + + def test_get_completed_includes_success_eval(self): + """GET for completed run should include is_successful.""" + mock_db, mock_doc_ref, _ = _mock_firestore() + + mock_doc = MagicMock() + mock_doc.exists = True + mock_doc.to_dict.return_value = { + "status": "completed", + "phase": "done", + "startedAt": MagicMock(isoformat=lambda: "2025-01-01T00:00:00Z"), + "completedAt": MagicMock(isoformat=lambda: "2025-01-01T00:02:00Z"), + "duration": 120, + "stats": { + "sourcesChecked": 100, + "sourcesFailed": 5, + "articlesStored": 250, + }, + "errors": [], + } + mock_doc_ref.get.return_value = mock_doc + + with patch("routers.trigger._get_db", return_value=mock_db): + from main import app + + with TestClient(app) as client: + response = client.get("/trigger/ingestion/run-completed") + + assert response.status_code == 200 + data = response.json() + assert data["is_successful"] is True diff --git a/tests/mcp/test_trigger_router.py b/tests/mcp/test_trigger_router.py new file mode 100644 index 0000000..470abb3 --- /dev/null +++ b/tests/mcp/test_trigger_router.py @@ -0,0 +1,222 @@ +""" +Trigger Router Unit Tests +========================= + +Tests for trigger router models, helpers, and source loading. +""" + +import pytest +import sys +from pathlib import Path +from unittest.mock import patch, MagicMock + +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "perception_app" / "mcp_service")) + + +class TestTriggerModels: + """Tests for trigger router Pydantic models.""" + + def test_trigger_request_defaults(self): + from routers.trigger import TriggerIngestionRequest + + req = TriggerIngestionRequest() + assert req.trigger == "manual" + assert req.time_window_hours == 24 + assert req.max_items_per_source == 50 + + def test_trigger_request_custom(self): + from routers.trigger import TriggerIngestionRequest + + req = TriggerIngestionRequest( + trigger="scheduled", + time_window_hours=48, + max_items_per_source=100, + ) + assert req.trigger == "scheduled" + assert req.time_window_hours == 48 + assert req.max_items_per_source == 100 + + def test_trigger_request_validation_min(self): + from routers.trigger import TriggerIngestionRequest + + with pytest.raises(Exception): + TriggerIngestionRequest(time_window_hours=0) + + def test_trigger_request_validation_max(self): + from routers.trigger import TriggerIngestionRequest + + with pytest.raises(Exception): + TriggerIngestionRequest(max_items_per_source=501) + + def test_accepted_response_model(self): + from routers.trigger import TriggerAcceptedResponse + + resp = TriggerAcceptedResponse( + run_id="run-abc123", + message="Started", + poll_url="/trigger/ingestion/run-abc123", + ) + assert resp.run_id == "run-abc123" + assert resp.status == "accepted" + assert resp.poll_url == "/trigger/ingestion/run-abc123" + + def test_run_status_model(self): + from routers.trigger import IngestionRunStatus + + status = IngestionRunStatus( + run_id="run-abc123", + status="running", + phase="fetching_feeds", + ) + assert status.run_id == "run-abc123" + assert status.is_successful is None + + def test_run_status_model_with_all_fields(self): + from routers.trigger import IngestionRunStatus + + status = IngestionRunStatus( + run_id="run-abc123", + status="completed", + phase="done", + started_at="2025-01-01T00:00:00Z", + completed_at="2025-01-01T00:02:30Z", + duration_seconds=150.0, + stats={"articlesStored": 42}, + errors=[], + is_successful=True, + ) + assert status.duration_seconds == 150.0 + assert status.is_successful is True + + +class TestEvaluateRunSuccess: + """Tests for evaluate_run_success helper.""" + + def test_good_run(self): + from routers.trigger import evaluate_run_success + + data = { + "stats": { + "sourcesChecked": 100, + "sourcesFailed": 5, + "articlesStored": 250, + }, + "duration": 120, + } + assert evaluate_run_success(data) is True + + def test_no_articles_stored(self): + from routers.trigger import evaluate_run_success + + data = { + "stats": { + "sourcesChecked": 100, + "sourcesFailed": 5, + "articlesStored": 0, + }, + "duration": 60, + } + assert evaluate_run_success(data) is False + + def test_high_error_rate(self): + from routers.trigger import evaluate_run_success + + data = { + "stats": { + "sourcesChecked": 100, + "sourcesFailed": 50, + "articlesStored": 10, + }, + "duration": 60, + } + assert evaluate_run_success(data) is False + + def test_too_slow(self): + from routers.trigger import evaluate_run_success + + data = { + "stats": { + "sourcesChecked": 100, + "sourcesFailed": 5, + "articlesStored": 250, + }, + "duration": 350, + } + assert evaluate_run_success(data) is False + + def test_empty_stats(self): + from routers.trigger import evaluate_run_success + + data = {"stats": {}, "duration": 10} + assert evaluate_run_success(data) is False + + def test_no_sources_checked(self): + """Edge case: 0 sources checked should still pass if articles stored.""" + from routers.trigger import evaluate_run_success + + data = { + "stats": { + "sourcesChecked": 0, + "sourcesFailed": 0, + "articlesStored": 10, + }, + "duration": 30, + } + assert evaluate_run_success(data) is True + + def test_exactly_50_percent_error_rate(self): + """50% error rate should fail (>= 0.5).""" + from routers.trigger import evaluate_run_success + + data = { + "stats": { + "sourcesChecked": 10, + "sourcesFailed": 5, + "articlesStored": 10, + }, + "duration": 30, + } + assert evaluate_run_success(data) is False + + def test_just_under_50_percent_error_rate(self): + """49% error rate should pass.""" + from routers.trigger import evaluate_run_success + + data = { + "stats": { + "sourcesChecked": 100, + "sourcesFailed": 49, + "articlesStored": 10, + }, + "duration": 30, + } + assert evaluate_run_success(data) is True + + +class TestLoadSources: + """Tests for load_sources helper.""" + + def test_load_sources_returns_list(self): + from routers.trigger import load_sources + + sources = load_sources() + assert isinstance(sources, list) + + def test_load_sources_has_required_fields(self): + from routers.trigger import load_sources + + sources = load_sources() + if sources: + source = sources[0] + assert "source_id" in source + assert "name" in source + assert "url" in source + assert "category" in source + + def test_load_sources_filters_inactive(self): + from routers.trigger import load_sources + + sources = load_sources() + # All returned sources should have been active + for source in sources: + assert source.get("source_id") # Must have an ID