| 
 | 1 | +# Plan: Fix Orphaned Messages on Run Failure  | 
 | 2 | + | 
 | 3 | +## Problem Statement  | 
 | 4 | + | 
 | 5 | +When a run fails, messages for pending tasks remain in the queue indefinitely, causing:  | 
 | 6 | +1. **Resource waste**: Workers continuously poll orphaned messages  | 
 | 7 | +2. **Performance degradation**: Queue operations slow down over time  | 
 | 8 | +3. **Map step issues**: Failing one map task leaves N-1 sibling messages orphaned  | 
 | 9 | +4. **Type violations**: Deterministic errors retry unnecessarily  | 
 | 10 | + | 
 | 11 | +## Current Behavior  | 
 | 12 | + | 
 | 13 | +### When fail_task is called  | 
 | 14 | +```sql  | 
 | 15 | +-- Only archives the single failing task's message  | 
 | 16 | +SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id);  | 
 | 17 | +-- Leaves all other queued messages orphaned  | 
 | 18 | +```  | 
 | 19 | + | 
 | 20 | +### When type constraint violation occurs  | 
 | 21 | +```sql  | 
 | 22 | +-- Raises exception, causes retries  | 
 | 23 | +RAISE EXCEPTION 'Map step % expects array input...';  | 
 | 24 | +-- Transaction rolls back, but retries will hit same error  | 
 | 25 | +```  | 
 | 26 | + | 
 | 27 | +## Implementation Plan  | 
 | 28 | + | 
 | 29 | +### 1. Update fail_task Function  | 
 | 30 | +**File**: `pkgs/core/schemas/0100_function_fail_task.sql`  | 
 | 31 | + | 
 | 32 | +Add after marking run as failed (around line 47):  | 
 | 33 | +```sql  | 
 | 34 | +-- Archive all pending messages for this run  | 
 | 35 | +WITH tasks_to_archive AS (  | 
 | 36 | +  SELECT t.msg_id  | 
 | 37 | +  FROM pgflow.step_tasks t  | 
 | 38 | +  WHERE t.run_id = fail_task.run_id  | 
 | 39 | +    AND t.status = 'pending'  | 
 | 40 | +    AND t.msg_id IS NOT NULL  | 
 | 41 | +)  | 
 | 42 | +SELECT pgmq.archive('pgflow_tasks_queue', msg_id)  | 
 | 43 | +FROM tasks_to_archive;  | 
 | 44 | +```  | 
 | 45 | + | 
 | 46 | +### 2. Update complete_task for Type Violations  | 
 | 47 | +**File**: `pkgs/core/schemas/0100_function_complete_task.sql`  | 
 | 48 | + | 
 | 49 | +Replace the current RAISE EXCEPTION block (lines 115-120) with:  | 
 | 50 | +```sql  | 
 | 51 | +IF v_dependent_map_slug IS NOT NULL THEN  | 
 | 52 | +  -- Mark run as failed immediately (no retries for type violations)  | 
 | 53 | +  UPDATE pgflow.runs  | 
 | 54 | +  SET status = 'failed',  | 
 | 55 | +      failed_at = now(),  | 
 | 56 | +      error = format('Type contract violation: Map step %s expects array input but dependency %s produced %s (output: %s)',  | 
 | 57 | +                     v_dependent_map_slug,  | 
 | 58 | +                     complete_task.step_slug,  | 
 | 59 | +                     CASE WHEN complete_task.output IS NULL THEN 'null'  | 
 | 60 | +                          ELSE jsonb_typeof(complete_task.output) END,  | 
 | 61 | +                     complete_task.output)  | 
 | 62 | +  WHERE run_id = complete_task.run_id;  | 
 | 63 | + | 
 | 64 | +  -- Archive ALL pending messages for this run  | 
 | 65 | +  PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id)  | 
 | 66 | +  FROM pgflow.step_tasks t  | 
 | 67 | +  WHERE t.run_id = complete_task.run_id  | 
 | 68 | +    AND t.status = 'pending'  | 
 | 69 | +    AND t.msg_id IS NOT NULL;  | 
 | 70 | + | 
 | 71 | +  -- Mark the current task as failed (not completed)  | 
 | 72 | +  UPDATE pgflow.step_tasks  | 
 | 73 | +  SET status = 'failed',  | 
 | 74 | +      failed_at = now(),  | 
 | 75 | +      error_message = format('Type contract violation: produced %s instead of array',  | 
 | 76 | +                            CASE WHEN complete_task.output IS NULL THEN 'null'  | 
 | 77 | +                                 ELSE jsonb_typeof(complete_task.output) END)  | 
 | 78 | +  WHERE run_id = complete_task.run_id  | 
 | 79 | +    AND step_slug = complete_task.step_slug  | 
 | 80 | +    AND task_index = complete_task.task_index;  | 
 | 81 | + | 
 | 82 | +  -- Return empty result set (task not completed)  | 
 | 83 | +  RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;  | 
 | 84 | +  RETURN;  | 
 | 85 | +END IF;  | 
 | 86 | +```  | 
 | 87 | + | 
 | 88 | +### 3. Add Supporting Index  | 
 | 89 | +**File**: New migration or add to existing  | 
 | 90 | + | 
 | 91 | +```sql  | 
 | 92 | +-- Speed up the archiving query  | 
 | 93 | +CREATE INDEX IF NOT EXISTS idx_step_tasks_pending_with_msg  | 
 | 94 | +ON pgflow.step_tasks(run_id, status)  | 
 | 95 | +WHERE status = 'pending' AND msg_id IS NOT NULL;  | 
 | 96 | +```  | 
 | 97 | + | 
 | 98 | +## Testing  | 
 | 99 | + | 
 | 100 | +### Tests Already Written (Stashed)  | 
 | 101 | + | 
 | 102 | +1. **`supabase/tests/fail_task/archive_sibling_map_tasks.test.sql`**  | 
 | 103 | +   - Verifies all map task messages are archived when one fails  | 
 | 104 | +   - Tests: 8 assertions about message archiving and status  | 
 | 105 | + | 
 | 106 | +2. **`supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql`**  | 
 | 107 | +   - Verifies type violations archive all pending messages  | 
 | 108 | +   - Tests: 8 assertions about queue cleanup and run status  | 
 | 109 | + | 
 | 110 | +### How to Run Tests  | 
 | 111 | +```bash  | 
 | 112 | +# After unstashing and implementing the fixes:  | 
 | 113 | +pnpm nx test:pgtap core -- supabase/tests/fail_task/archive_sibling_map_tasks.test.sql  | 
 | 114 | +pnpm nx test:pgtap core -- supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql  | 
 | 115 | +```  | 
 | 116 | + | 
 | 117 | +## Migration Considerations  | 
 | 118 | + | 
 | 119 | +### Backward Compatibility  | 
 | 120 | +- New behavior only affects failed runs (safe)  | 
 | 121 | +- Archiving preserves messages (can be recovered if needed)  | 
 | 122 | +- No schema changes to existing tables  | 
 | 123 | + | 
 | 124 | +### Performance Impact  | 
 | 125 | +- One-time cost during failure (acceptable)  | 
 | 126 | +- Prevents ongoing performance degradation (improvement)  | 
 | 127 | +- Index ensures archiving query is efficient  | 
 | 128 | + | 
 | 129 | +### Rollback Plan  | 
 | 130 | +If issues arise:  | 
 | 131 | +1. Remove the archiving logic  | 
 | 132 | +2. Messages remain in queue (old behavior)  | 
 | 133 | +3. No data loss since we archive, not delete  | 
 | 134 | + | 
 | 135 | +## Edge Cases to Consider  | 
 | 136 | + | 
 | 137 | +### 1. Concurrent Task Completion  | 
 | 138 | +If multiple tasks complete/fail simultaneously:  | 
 | 139 | +- PostgreSQL row locks ensure consistency  | 
 | 140 | +- Each failure archives all pending messages  | 
 | 141 | +- Idempotent: archiving already-archived messages is safe  | 
 | 142 | + | 
 | 143 | +### 2. Very Large Map Steps  | 
 | 144 | +For maps with 1000+ tasks:  | 
 | 145 | +- Archiving might take several seconds  | 
 | 146 | +- Consider batching if performance issues arise  | 
 | 147 | +- Current approach should handle up to ~10k tasks reasonably  | 
 | 148 | + | 
 | 149 | +### 3. Mixed Step Types  | 
 | 150 | +When run has both map and single steps:  | 
 | 151 | +- Archive logic handles all pending tasks regardless of type  | 
 | 152 | +- Correctly archives both map siblings and unrelated pending tasks  | 
 | 153 | + | 
 | 154 | +## Future Enhancements (Not for this PR)  | 
 | 155 | + | 
 | 156 | +1. **Selective Archiving**: Only archive tasks that can't proceed  | 
 | 157 | +2. **Batch Operations**: Archive in chunks for very large runs  | 
 | 158 | +3. **Recovery Mechanism**: Function to unarchive and retry  | 
 | 159 | +4. **Monitoring**: Track archived message counts for alerting  | 
 | 160 | + | 
 | 161 | +## Success Criteria  | 
 | 162 | + | 
 | 163 | +- [ ] All tests pass (both new test files)  | 
 | 164 | +- [ ] No orphaned messages after run failure  | 
 | 165 | +- [ ] Type violations don't retry  | 
 | 166 | +- [ ] Performance acceptable for maps with 100+ tasks  | 
 | 167 | +- [ ] No impact on successful run performance  | 
 | 168 | + | 
 | 169 | +## Implementation Checklist  | 
 | 170 | + | 
 | 171 | +- [ ] Update `fail_task` function  | 
 | 172 | +- [ ] Update `complete_task` function  | 
 | 173 | +- [ ] Add database index  | 
 | 174 | +- [ ] Unstash and run tests  | 
 | 175 | +- [ ] Test with large map steps (100+ tasks)  | 
 | 176 | +- [ ] Update migration file  | 
 | 177 | +- [ ] Document behavior change in function comments  | 
 | 178 | + | 
 | 179 | +## Notes  | 
 | 180 | + | 
 | 181 | +- This fix is **critical for production** - without it, queue performance will degrade over time  | 
 | 182 | +- Type violations are **deterministic** - retrying them is always wasteful  | 
 | 183 | +- Archiving (vs deleting) preserves debugging capability  | 
 | 184 | +- The fix is relatively simple (~30 lines of SQL) but high impact  | 
0 commit comments