diff --git a/.changeset/hungry-cloths-hunt.md b/.changeset/hungry-cloths-hunt.md new file mode 100644 index 000000000..38cc06f65 --- /dev/null +++ b/.changeset/hungry-cloths-hunt.md @@ -0,0 +1,12 @@ +--- +'@pgflow/core': patch +--- + +Improve failure handling and prevent orphaned messages in queue + +- Archive all queued messages when a run fails to prevent resource waste +- Handle type constraint violations gracefully without exceptions +- Store output on failed tasks (including type violations) for debugging +- Add performance index for efficient message archiving +- Prevent retries on already-failed runs +- Update table constraint to allow output storage on failed tasks diff --git a/.claude/commands/fix-sql-tests.md b/.claude/commands/fix-sql-tests.md new file mode 100644 index 000000000..b207116ad --- /dev/null +++ b/.claude/commands/fix-sql-tests.md @@ -0,0 +1,17 @@ +Your job is to fix SQL tests, either by fixing the tests if those are invalid, +or updating the SQL functions in pkgs/core/schemas/ and trying again. + +If updating functions, load them with psql. + +!`pnpm nx supabase:status core --output env | grep DB_URL` +PWD: !`pwd` + +To rerun the test(s), run this command from `pkgs/core` directory: + +`scripts/run-test-with-colors supabase/tests/` + +Do not create any migratons or try to run tests with nx. + + +!`pnpm nx test:pgtap core` + diff --git a/.claude/commands/test-first-sql.md b/.claude/commands/test-first-sql.md new file mode 100644 index 000000000..470a9a627 --- /dev/null +++ b/.claude/commands/test-first-sql.md @@ -0,0 +1,40 @@ +Your job is to implement the feature below in a test-first manner. +First, you must idenfity what things you want to test for. +Then you must write one test at a time, from the simplest, more generic, +to more precise (if applicable, sometimes you only need to write one test per +thing, without multiple per thing). + +To run the test(s), run this command from `pkgs/core` directory: + +`scripts/run-test-with-colors supabase/tests/` + +The newly written test must fail for the correct reasons. + +In order to make the test pass, you need to update function +code in pkgs/core/schemas/. + +After updating you should use `psql` to execute function file +and update function in database. + +!`pnpm nx supabase:status core --output env | grep DB_URL` +PWD: !`pwd` + +Repeat until all the added tests are passing. + +When they do, run all the tests like this: + +`scripts/run-test-with-colors supabase/tests/` + +Do not create any migratons or try to run tests with nx. + +Never use any INSERTs or UPDATEs to prepare or mutate state for the test. +Instead, use regular pgflow.\* SQL functions or functions that are +available in pkgs/core/supabase/tests/seed.sql: + +!`grep 'function.*pgflow_tests' pkgs/core/supabase/seed.sql -A7` + +Check how they are used in other tests. + + +$ARGUMENTS + diff --git a/PLAN.md b/PLAN.md index a4c74d8f9..3043d701f 100644 --- a/PLAN.md +++ b/PLAN.md @@ -10,7 +10,7 @@ - ✅ **DONE**: Array element extraction - tasks receive individual array elements - ✅ **DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents - ✅ **DONE**: DSL support for `.map()` for defining map steps with compile-time duplicate detection -- ⏳ **TODO**: Fix orphaned messages on run failure +- ✅ **DONE**: Fix orphaned messages on run failure - ⏳ **TODO**: Performance optimization with step_states.output column ### Chores @@ -106,18 +106,28 @@ - Updated DSL README with .map() documentation - Created detailed changeset +- [x] **PR #219: Fix Orphaned Messages on Run Failure** - `09-18-fix-orphaned-messages-on-fail` ✅ COMPLETED + + - Archives all queued messages when run fails (prevents orphaned messages) + - Handles type constraint violations gracefully without exceptions + - Added guards to prevent any mutations on failed runs: + - complete_task returns unchanged + - start_ready_steps exits early + - cascade_complete_taskless_steps returns 0 + - Added performance index for efficient message archiving + - Tests unstashed and passing (archive_sibling_map_tasks, archive_messages_on_type_constraint_failure) + - Updated core README with failure handling mentions + - **Critical fix: prevents queue performance degradation in production** + #### ❌ Remaining Work (Priority Order) -- [ ] **Priority 1: Fix Orphaned Messages on Run Failure** 🚨 CRITICAL +- [ ] **Integration Tests** - - Archive all pending messages when run fails - - Handle map sibling tasks specially - - Fix type constraint violations to fail immediately without retries - - See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md) - - **Critical for production: prevents queue performance degradation** - - Tests already written (stashed) that document the problem + - End-to-end workflows with real array data + - Basic happy path coverage + - This should be minimal and added to the Edge Worker integration test suite for now -- [ ] **Priority 2: Performance Optimization - step_states.output Column** +- [ ] **Performance Optimization - step_states.output Column** - Migrate from inline aggregation to storing outputs in step_states - See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md) @@ -132,43 +142,33 @@ - Update all aggregation tests (~17 files) - **Note**: This is an optimization that should be done after core functionality is stable -- [ ] **Priority 3: Integration Tests** - - - End-to-end workflows with real array data - - Basic happy path coverage - - This should be minimal and added to the Edge Worker integration test suite for now - -- [ ] **Priority 4: Update core README** - - - `pkgs/core/README.md` - - - Add new section describing the step types - - Describe single step briefly, focus on describing map step type and how it differs - - Make sure to mention that maps are constrained to have exactly one dependency - - Show multiple cases of inputs -> task creation - - Explain edge cases (empty array propagation, invalid array input) - - Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input - - Explain cascade completion of taskless steps and its limitations +- [ ] **Update `pkgs/core/README.md`** -- [ ] **Priority 5: Add docs page** + - Add new section describing the step types + - Describe single step briefly, focus on describing map step type and how it differs + - Make sure to mention that maps are constrained to have exactly one dependency + - Show multiple cases of inputs -> task creation + - Explain edge cases (empty array propagation, invalid array input) + - Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input + - Explain cascade completion of taskless steps and its limitations - - **Add basic docs page** +- [ ] **Add docs page** - - put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx` - - describe the DSL and how the map works and why we need it - - show example usage of root map - - show example usage of dependent map - - focus mostly on how to use it, instead of how it works under the hood - - link to the README's for more details + - put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx` + - describe the DSL and how the map works and why we need it + - show example usage of root map + - show example usage of dependent map + - focus mostly on how to use it, instead of how it works under the hood + - link to the README's for more details -- [ ] **Priority 6: Migration Consolidation** (Do this last before merge!) +- [ ] **Migration Consolidation** - Remove all temporary/incremental migrations from feature branches - Generate a single consolidated migration for the entire map infrastructure - Ensure clean migration path from current production schema - If NULL improvement is done, include it in the consolidated migration -- [ ] **Priority 7: Graphite Stack Merge** +- [ ] **Graphite Stack Merge** - Configure Graphite merge queue for the complete PR stack - Ensure all PRs in sequence can be merged together diff --git a/PLAN_orphaned_messages.md b/PLAN_orphaned_messages.md deleted file mode 100644 index 9b8e074b0..000000000 --- a/PLAN_orphaned_messages.md +++ /dev/null @@ -1,184 +0,0 @@ -# Plan: Fix Orphaned Messages on Run Failure - -## Problem Statement - -When a run fails, messages for pending tasks remain in the queue indefinitely, causing: -1. **Resource waste**: Workers continuously poll orphaned messages -2. **Performance degradation**: Queue operations slow down over time -3. **Map step issues**: Failing one map task leaves N-1 sibling messages orphaned -4. **Type violations**: Deterministic errors retry unnecessarily - -## Current Behavior - -### When fail_task is called -```sql --- Only archives the single failing task's message -SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id); --- Leaves all other queued messages orphaned -``` - -### When type constraint violation occurs -```sql --- Raises exception, causes retries -RAISE EXCEPTION 'Map step % expects array input...'; --- Transaction rolls back, but retries will hit same error -``` - -## Implementation Plan - -### 1. Update fail_task Function -**File**: `pkgs/core/schemas/0100_function_fail_task.sql` - -Add after marking run as failed (around line 47): -```sql --- Archive all pending messages for this run -WITH tasks_to_archive AS ( - SELECT t.msg_id - FROM pgflow.step_tasks t - WHERE t.run_id = fail_task.run_id - AND t.status = 'pending' - AND t.msg_id IS NOT NULL -) -SELECT pgmq.archive('pgflow_tasks_queue', msg_id) -FROM tasks_to_archive; -``` - -### 2. Update complete_task for Type Violations -**File**: `pkgs/core/schemas/0100_function_complete_task.sql` - -Replace the current RAISE EXCEPTION block (lines 115-120) with: -```sql -IF v_dependent_map_slug IS NOT NULL THEN - -- Mark run as failed immediately (no retries for type violations) - UPDATE pgflow.runs - SET status = 'failed', - failed_at = now(), - error = format('Type contract violation: Map step %s expects array input but dependency %s produced %s (output: %s)', - v_dependent_map_slug, - complete_task.step_slug, - CASE WHEN complete_task.output IS NULL THEN 'null' - ELSE jsonb_typeof(complete_task.output) END, - complete_task.output) - WHERE run_id = complete_task.run_id; - - -- Archive ALL pending messages for this run - PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id) - FROM pgflow.step_tasks t - WHERE t.run_id = complete_task.run_id - AND t.status = 'pending' - AND t.msg_id IS NOT NULL; - - -- Mark the current task as failed (not completed) - UPDATE pgflow.step_tasks - SET status = 'failed', - failed_at = now(), - error_message = format('Type contract violation: produced %s instead of array', - CASE WHEN complete_task.output IS NULL THEN 'null' - ELSE jsonb_typeof(complete_task.output) END) - WHERE run_id = complete_task.run_id - AND step_slug = complete_task.step_slug - AND task_index = complete_task.task_index; - - -- Return empty result set (task not completed) - RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; - RETURN; -END IF; -``` - -### 3. Add Supporting Index -**File**: New migration or add to existing - -```sql --- Speed up the archiving query -CREATE INDEX IF NOT EXISTS idx_step_tasks_pending_with_msg -ON pgflow.step_tasks(run_id, status) -WHERE status = 'pending' AND msg_id IS NOT NULL; -``` - -## Testing - -### Tests Already Written (Stashed) - -1. **`supabase/tests/fail_task/archive_sibling_map_tasks.test.sql`** - - Verifies all map task messages are archived when one fails - - Tests: 8 assertions about message archiving and status - -2. **`supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql`** - - Verifies type violations archive all pending messages - - Tests: 8 assertions about queue cleanup and run status - -### How to Run Tests -```bash -# After unstashing and implementing the fixes: -pnpm nx test:pgtap core -- supabase/tests/fail_task/archive_sibling_map_tasks.test.sql -pnpm nx test:pgtap core -- supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql -``` - -## Migration Considerations - -### Backward Compatibility -- New behavior only affects failed runs (safe) -- Archiving preserves messages (can be recovered if needed) -- No schema changes to existing tables - -### Performance Impact -- One-time cost during failure (acceptable) -- Prevents ongoing performance degradation (improvement) -- Index ensures archiving query is efficient - -### Rollback Plan -If issues arise: -1. Remove the archiving logic -2. Messages remain in queue (old behavior) -3. No data loss since we archive, not delete - -## Edge Cases to Consider - -### 1. Concurrent Task Completion -If multiple tasks complete/fail simultaneously: -- PostgreSQL row locks ensure consistency -- Each failure archives all pending messages -- Idempotent: archiving already-archived messages is safe - -### 2. Very Large Map Steps -For maps with 1000+ tasks: -- Archiving might take several seconds -- Consider batching if performance issues arise -- Current approach should handle up to ~10k tasks reasonably - -### 3. Mixed Step Types -When run has both map and single steps: -- Archive logic handles all pending tasks regardless of type -- Correctly archives both map siblings and unrelated pending tasks - -## Future Enhancements (Not for this PR) - -1. **Selective Archiving**: Only archive tasks that can't proceed -2. **Batch Operations**: Archive in chunks for very large runs -3. **Recovery Mechanism**: Function to unarchive and retry -4. **Monitoring**: Track archived message counts for alerting - -## Success Criteria - -- [ ] All tests pass (both new test files) -- [ ] No orphaned messages after run failure -- [ ] Type violations don't retry -- [ ] Performance acceptable for maps with 100+ tasks -- [ ] No impact on successful run performance - -## Implementation Checklist - -- [ ] Update `fail_task` function -- [ ] Update `complete_task` function -- [ ] Add database index -- [ ] Unstash and run tests -- [ ] Test with large map steps (100+ tasks) -- [ ] Update migration file -- [ ] Document behavior change in function comments - -## Notes - -- This fix is **critical for production** - without it, queue performance will degrade over time -- Type violations are **deterministic** - retrying them is always wasteful -- Archiving (vs deleting) preserves debugging capability -- The fix is relatively simple (~30 lines of SQL) but high impact \ No newline at end of file diff --git a/PLAN_step_output.md b/PLAN_step_output.md index b88848a9b..cebaed580 100644 --- a/PLAN_step_output.md +++ b/PLAN_step_output.md @@ -59,6 +59,9 @@ ADD CONSTRAINT output_only_when_completed CHECK ( 2. **Map steps**: Store aggregated array ordered by task_index 3. **Taskless steps**: Store empty array `[]` for map steps, NULL for single steps 4. **Storage location**: Step output belongs at step level, not task level +5. **Failed steps**: Do NOT store output at step level (only completed steps have step output) + - Individual failed tasks still store their output for debugging + - This includes type violation failures where task output is preserved ## Implementation Changes @@ -188,7 +191,8 @@ SET status = 'completed', - Add: Verify output remains NULL until all tasks complete 10. **`tests/map_output_aggregation/failed_task_handling.test.sql`** - - Add: Verify output remains NULL when step fails + - Add: Verify step output remains NULL when step fails + - Add: Verify individual task outputs are still preserved for debugging 11. **`tests/map_output_aggregation/map_initial_tasks_timing.test.sql`** - No changes needed (focuses on timing) @@ -230,6 +234,8 @@ SET status = 'completed', ```sql -- Verify output is NULL for non-completed steps -- Check constraint prevents setting output on non-completed steps + -- Verify failed steps have NULL output at step level + -- Verify failed tasks still preserve their output for debugging ``` 4. **`tests/step_output/taskless_step_outputs.test.sql`** @@ -244,6 +250,13 @@ SET status = 'completed', -- Map with [null, {data}, null] -> step_states.output has all three ``` +6. **`tests/step_output/failed_task_output_preservation.test.sql`** + ```sql + -- Verify failed tasks store their output (including type violations) + -- Verify step output remains NULL when step fails + -- Test both regular failures and type constraint violations + ``` + #### Tests to Remove/Update 1. **`tests/map_output_aggregation/broadcast_output_verification.test.sql`** diff --git a/pkgs/core/PLAN_race_condition_testing.md b/pkgs/core/PLAN_race_condition_testing.md new file mode 100644 index 000000000..6667e1e30 --- /dev/null +++ b/pkgs/core/PLAN_race_condition_testing.md @@ -0,0 +1,176 @@ +# PLAN: Race Condition Testing for Type Violations + +## Background + +When a type violation occurs (e.g., single step produces non-array for dependent map), the system must archive ALL active messages to prevent orphaned messages that cycle through workers indefinitely. + +## Current Issue + +The fix archives both `'queued'` AND `'started'` tasks, but existing tests don't properly validate the race condition scenarios. + +## Test Scenarios Needed + +### 1. Basic Type Violation (✅ Already Covered) +**Scenario**: Single task causes type violation +``` +step1 (single) → step2 (single) → map_step +``` +- Worker completes step2 with non-array +- Verify run fails and current task's message is archived +- **Coverage**: `non_array_to_map_should_fail.test.sql` + +### 2. Concurrent Started Tasks (❌ Not Covered) +**Scenario**: Multiple workers have tasks in 'started' state when violation occurs +``` +producer (single) → map_consumer (map, expects array) +producer (single) → parallel_task1 (single) +producer (single) → parallel_task2 (single) +``` + +**Test Flow**: +1. Complete producer with `[1, 2, 3]` (spawns 3 map tasks + 2 parallel tasks) +2. Worker A starts `map_consumer[0]` +3. Worker B starts `map_consumer[1]` +4. Worker C starts `parallel_task1` +5. Worker D starts `parallel_task2` +6. Worker C completes `parallel_task1` with non-array (violates some other map dependency) +7. **Verify**: ALL started tasks (map_consumer[0], map_consumer[1], parallel_task2) get archived + +### 3. Mixed Queue States (❌ Not Covered) +**Scenario**: Mix of queued and started tasks across different steps +``` +step1 → step2 → step3 → map_step + ↘ step4 → step5 +``` + +**Test Flow**: +1. Complete step1 +2. Worker A starts step2 +3. Worker B starts step4 +4. Step3 and step5 remain queued +5. Worker A completes step2 with type violation +6. **Verify**: Both started (step4) AND queued (step3, step5) messages archived + +### 4. Map Task Partial Processing (❌ Not Covered) +**Scenario**: Some map tasks started, others queued when violation occurs +``` +producer → large_map (100 elements) +``` + +**Test Flow**: +1. Producer outputs array of 100 elements +2. Workers start processing first 10 tasks +3. 90 tasks remain queued +4. One of the started tasks detects downstream type violation +5. **Verify**: All 100 messages (10 started + 90 queued) get archived + +### 5. Visibility Timeout Verification (❌ Not Covered) +**Scenario**: Ensure orphaned messages don't reappear after timeout +``` +step1 → step2 → map_step +``` + +**Test Flow**: +1. Worker starts step2 (30s visibility timeout) +2. Type violation occurs but message NOT archived (simulate bug) +3. Wait 31 seconds +4. **Verify**: Message reappears in queue (demonstrates the bug) +5. Apply fix and verify message doesn't reappear + +### 6. Nested Map Chains (❌ Not Covered) +**Scenario**: Type violation in middle of map chain +``` +map1 (produces arrays) → map2 (expects arrays) → map3 +``` + +**Test Flow**: +1. map1 task completes with non-array (violates map2 expectation) +2. Other map1 tasks are in various states (started/queued) +3. **Verify**: All map1 tasks archived, map2 never starts + +### 7. Race During Archival (❌ Not Covered) +**Scenario**: Worker tries to complete task while archival is happening +``` +step1 → step2 → map_step +``` + +**Test Flow**: +1. Worker A detects type violation, begins archiving +2. Worker B tries to complete its task during archival +3. **Verify**: Worker B's completion is rejected (guard clause) +4. **Verify**: No duplicate archival attempts + +## Implementation Strategy + +### Test Utilities Needed + +1. **Multi-worker simulator**: +```sql +CREATE FUNCTION pgflow_tests.simulate_worker( + worker_id uuid, + flow_slug text +) RETURNS TABLE(...); +``` + +2. **Queue state inspector**: +```sql +CREATE FUNCTION pgflow_tests.inspect_queue_state( + flow_slug text +) RETURNS TABLE( + message_id bigint, + task_status text, + visibility_timeout timestamptz +); +``` + +3. **Time manipulation** (for visibility timeout tests): +```sql +-- May need to mock pgmq visibility behavior +``` + +### Test File Organization + +``` +supabase/tests/type_violations/ +├── basic_violation.test.sql # Existing coverage +├── concurrent_started_tasks.test.sql # NEW: Scenario 2 +├── mixed_queue_states.test.sql # NEW: Scenario 3 +├── map_partial_processing.test.sql # NEW: Scenario 4 +├── visibility_timeout_recovery.test.sql # NEW: Scenario 5 +├── nested_map_chains.test.sql # NEW: Scenario 6 +└── race_during_archival.test.sql # NEW: Scenario 7 +``` + +## Success Criteria + +1. **No orphaned messages**: Queue must be empty after type violation +2. **No message resurrection**: Archived messages don't reappear after timeout +3. **Complete cleanup**: ALL tasks (queued + started) for the run are handled +4. **Atomic operation**: Archival happens in single transaction +5. **Guard effectiveness**: No operations on failed runs + +## Performance Considerations + +- Test with large numbers of tasks (1000+) to verify batch archival performance +- Ensure archival doesn't lock tables for extended periods +- Verify index usage on `(run_id, status, message_id)` + +## Current Gap Analysis + +**What we have**: +- Basic type violation detection ✅ +- Single task archival ✅ +- Run failure on violation ✅ + +**What we need**: +- True concurrent worker simulation ❌ +- Multi-task race condition validation ❌ +- Visibility timeout verification ❌ +- Performance under load testing ❌ + +## Priority + +1. **HIGH**: Concurrent started tasks (Scenario 2) - Most common real-world case +2. **HIGH**: Map partial processing (Scenario 4) - Critical for large arrays +3. **MEDIUM**: Mixed queue states (Scenario 3) - Complex flows +4. **LOW**: Other scenarios - Edge cases but important for robustness \ No newline at end of file diff --git a/pkgs/core/README.md b/pkgs/core/README.md index 8cc4ad7d8..85c89a8fa 100644 --- a/pkgs/core/README.md +++ b/pkgs/core/README.md @@ -231,10 +231,16 @@ The system handles failures by: - Preventing processing until the visibility timeout expires 3. When retries are exhausted: - Marking the task as 'failed' + - Storing the task output (even for failed tasks) - Marking the step as 'failed' - Marking the run as 'failed' - Archiving the message in PGMQ - - Notifying workers to abort pending tasks (future feature) + - **Archiving all queued messages for the failed run** (preventing orphaned messages) +4. Additional failure handling: + - **No retries on already-failed runs** - tasks are immediately marked as failed + - **Graceful type constraint violations** - handled without exceptions when single steps feed map steps + - **Stores invalid output on type violations** - captures the output that caused the violation for debugging + - **Performance-optimized message archiving** using indexed queries #### Retries and Timeouts diff --git a/pkgs/core/schemas/0060_tables_runtime.sql b/pkgs/core/schemas/0060_tables_runtime.sql index 0ce5ff620..7ad33921c 100644 --- a/pkgs/core/schemas/0060_tables_runtime.sql +++ b/pkgs/core/schemas/0060_tables_runtime.sql @@ -82,7 +82,7 @@ create table pgflow.step_tasks ( status in ('queued', 'started', 'completed', 'failed') ), constraint output_valid_only_for_completed check ( - output is null or status = 'completed' + output is null or status in ('completed', 'failed') ), constraint attempts_count_nonnegative check (attempts_count >= 0), constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)), diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index fc41fdd18..34f0fdc02 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -12,13 +12,35 @@ as $$ declare v_step_state pgflow.step_states%ROWTYPE; v_dependent_map_slug text; + v_run_record pgflow.runs%ROWTYPE; + v_step_record pgflow.step_states%ROWTYPE; begin -- ========================================== --- VALIDATION: Array output for dependent maps +-- GUARD: No mutations on failed runs -- ========================================== --- Must happen BEFORE acquiring locks to fail fast without holding resources --- Only validate for single steps - map steps produce scalars that get aggregated +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + +-- ========================================== +-- LOCK ACQUISITION AND TYPE VALIDATION +-- ========================================== +-- Acquire locks first to prevent race conditions +SELECT * INTO v_run_record FROM pgflow.runs +WHERE pgflow.runs.run_id = complete_task.run_id +FOR UPDATE; + +SELECT * INTO v_step_record FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug +FOR UPDATE; + +-- Check for type violations AFTER acquiring locks SELECT child_step.step_slug INTO v_dependent_map_slug FROM pgflow.deps dependency JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug @@ -28,7 +50,7 @@ JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug AND child_state.step_slug = child_step.step_slug WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step - AND dependency.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND dependency.flow_slug = v_run_record.flow_slug AND parent_step.step_type = 'single' -- Only validate single steps AND child_step.step_type = 'map' AND child_state.run_id = complete_task.run_id @@ -36,31 +58,69 @@ WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') LIMIT 1; +-- Handle type violation if detected IF v_dependent_map_slug IS NOT NULL THEN - RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)', - v_dependent_map_slug, - complete_task.step_slug, - CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END, - complete_task.output; + -- Mark run as failed immediately + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = complete_task.run_id; + + -- Archive all active messages (both queued and started) to prevent orphaned messages + PERFORM pgmq.archive( + v_run_record.flow_slug, + array_agg(st.message_id) + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING count(*) > 0; -- Only call archive if there are messages to archive + + -- Mark current task as failed and store the output + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + output = complete_task.output, -- Store the output that caused the violation + error_message = '[TYPE_VIOLATION] Produced ' || + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END || + ' instead of array' + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + + -- Mark step state as failed + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug; + + -- Archive the current task's message (it was started, now failed) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id -- Single message, use scalar form + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + + -- Return empty result + RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + RETURN; END IF; -- ========================================== -- MAIN CTE CHAIN: Update task and propagate changes -- ========================================== WITH --- ---------- Lock acquisition ---------- --- Acquire locks in consistent order (run -> step) to prevent deadlocks -run_lock AS ( - SELECT * FROM pgflow.runs - WHERE pgflow.runs.run_id = complete_task.run_id - FOR UPDATE -), -step_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - FOR UPDATE -), -- ---------- Task completion ---------- -- Update the task record with completion status and output task AS ( diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index a3f328ab7..cab0cd846 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -14,6 +14,35 @@ DECLARE v_step_failed boolean; begin +-- If run is already failed, no retries allowed +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + error_message = fail_task.error_message + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index + AND pgflow.step_tasks.status = 'started'; + + -- Archive the task's message + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; +END IF; + WITH run_lock AS ( SELECT * FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id @@ -140,6 +169,18 @@ IF v_run_failed THEN END; END IF; +-- Archive all active messages (both queued and started) when run fails +IF v_run_failed THEN + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; +END IF; + -- For queued tasks: delay the message for retry with exponential backoff PERFORM ( WITH retry_config AS ( @@ -169,20 +210,16 @@ PERFORM ( ); -- For failed tasks: archive the message -PERFORM ( - WITH failed_tasks AS ( - SELECT r.flow_slug, st.message_id - FROM pgflow.step_tasks st - JOIN pgflow.runs r ON st.run_id = r.run_id - WHERE st.run_id = fail_task.run_id - AND st.step_slug = fail_task.step_slug - AND st.task_index = fail_task.task_index - AND st.status = 'failed' - ) - SELECT pgmq.archive(ft.flow_slug, ft.message_id) - FROM failed_tasks ft - WHERE EXISTS (SELECT 1 FROM failed_tasks) -); +PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) +FROM pgflow.step_tasks st +JOIN pgflow.runs r ON st.run_id = r.run_id +WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'failed' + AND st.message_id IS NOT NULL +GROUP BY r.flow_slug +HAVING COUNT(st.message_id) > 0; return query select * from pgflow.step_tasks st diff --git a/pkgs/core/schemas/0100_function_start_ready_steps.sql b/pkgs/core/schemas/0100_function_start_ready_steps.sql index 0952671c2..3fd29ca1d 100644 --- a/pkgs/core/schemas/0100_function_start_ready_steps.sql +++ b/pkgs/core/schemas/0100_function_start_ready_steps.sql @@ -1,8 +1,16 @@ create or replace function pgflow.start_ready_steps(run_id uuid) returns void -language sql +language plpgsql set search_path to '' as $$ +begin +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN + RETURN; +END IF; + -- ========================================== -- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) -- ========================================== @@ -165,4 +173,5 @@ SELECT sent_messages.msg_id FROM sent_messages; +end; $$; diff --git a/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql b/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql new file mode 100644 index 000000000..5675f4df1 --- /dev/null +++ b/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql @@ -0,0 +1,688 @@ +-- Modify "step_tasks" table +ALTER TABLE "pgflow"."step_tasks" DROP CONSTRAINT "output_valid_only_for_completed", ADD CONSTRAINT "output_valid_only_for_completed" CHECK ((output IS NULL) OR (status = ANY (ARRAY['completed'::text, 'failed'::text]))); +-- Modify "start_ready_steps" function +CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +begin +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN + RETURN; +END IF; + +-- ========================================== +-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) +-- ========================================== +-- These complete immediately without spawning tasks +WITH empty_map_steps AS ( + SELECT step_state.* + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.step_type = 'map' + AND step_state.initial_tasks = 0 + ORDER BY step_state.step_slug + FOR UPDATE OF step_state +), +-- ---------- Complete empty map steps ---------- +completed_empty_steps AS ( + UPDATE pgflow.step_states + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM empty_map_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = empty_map_steps.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Broadcast completion events ---------- +broadcast_empty_completed AS ( + SELECT + realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', completed_step.run_id, + 'step_slug', completed_step.step_slug, + 'status', 'completed', + 'started_at', completed_step.started_at, + 'completed_at', completed_step.completed_at, + 'remaining_tasks', 0, + 'remaining_deps', 0, + 'output', '[]'::jsonb + ), + concat('step:', completed_step.step_slug, ':completed'), + concat('pgflow:run:', completed_step.run_id), + false + ) + FROM completed_empty_steps AS completed_step +), + +-- ========================================== +-- HANDLE NORMAL STEPS (initial_tasks > 0) +-- ========================================== +-- ---------- Find ready steps ---------- +-- Steps with no remaining deps and known task count +ready_steps AS ( + SELECT * + FROM pgflow.step_states AS step_state + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count + AND step_state.initial_tasks > 0 -- Don't start taskless steps + -- Exclude empty map steps already handled + AND NOT EXISTS ( + SELECT 1 FROM empty_map_steps + WHERE empty_map_steps.run_id = step_state.run_id + AND empty_map_steps.step_slug = step_state.step_slug + ) + ORDER BY step_state.step_slug + FOR UPDATE +), +-- ---------- Mark steps as started ---------- +started_step_states AS ( + UPDATE pgflow.step_states + SET status = 'started', + started_at = now(), + remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting + FROM ready_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = ready_steps.step_slug + RETURNING pgflow.step_states.* +), + +-- ========================================== +-- TASK GENERATION AND QUEUE MESSAGES +-- ========================================== +-- ---------- Generate tasks and batch messages ---------- +-- Single steps: 1 task (index 0) +-- Map steps: N tasks (indices 0..N-1) +message_batches AS ( + SELECT + started_step.flow_slug, + started_step.run_id, + started_step.step_slug, + COALESCE(step.opt_start_delay, 0) as delay, + array_agg( + jsonb_build_object( + 'flow_slug', started_step.flow_slug, + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'task_index', task_idx.task_index + ) ORDER BY task_idx.task_index + ) AS messages, + array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices + FROM started_step_states AS started_step + JOIN pgflow.steps AS step + ON step.flow_slug = started_step.flow_slug + AND step.step_slug = started_step.step_slug + -- Generate task indices from 0 to initial_tasks-1 + CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) + GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay +), +-- ---------- Send messages to queue ---------- +-- Uses batch sending for performance with large arrays +sent_messages AS ( + SELECT + mb.flow_slug, + mb.run_id, + mb.step_slug, + task_indices.task_index, + msg_ids.msg_id + FROM message_batches mb + CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) + CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) + WHERE task_indices.idx_ord = msg_ids.msg_ord +), + +-- ---------- Broadcast step:started events ---------- +broadcast_events AS ( + SELECT + realtime.send( + jsonb_build_object( + 'event_type', 'step:started', + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'status', 'started', + 'started_at', started_step.started_at, + 'remaining_tasks', started_step.remaining_tasks, + 'remaining_deps', started_step.remaining_deps + ), + concat('step:', started_step.step_slug, ':started'), + concat('pgflow:run:', started_step.run_id), + false + ) + FROM started_step_states AS started_step +) + +-- ========================================== +-- RECORD TASKS IN DATABASE +-- ========================================== +INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) +SELECT + sent_messages.flow_slug, + sent_messages.run_id, + sent_messages.step_slug, + sent_messages.task_index, + sent_messages.msg_id +FROM sent_messages; + +end; +$$; +-- Modify "complete_task" function +CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_step_state pgflow.step_states%ROWTYPE; + v_dependent_map_slug text; + v_run_record pgflow.runs%ROWTYPE; + v_step_record pgflow.step_states%ROWTYPE; +begin + +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + +-- ========================================== +-- LOCK ACQUISITION AND TYPE VALIDATION +-- ========================================== +-- Acquire locks first to prevent race conditions +SELECT * INTO v_run_record FROM pgflow.runs +WHERE pgflow.runs.run_id = complete_task.run_id +FOR UPDATE; + +SELECT * INTO v_step_record FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug +FOR UPDATE; + +-- Check for type violations AFTER acquiring locks +SELECT child_step.step_slug INTO v_dependent_map_slug +FROM pgflow.deps dependency +JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug + AND child_step.step_slug = dependency.step_slug +JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug + AND parent_step.step_slug = dependency.dep_slug +JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug + AND child_state.step_slug = child_step.step_slug +WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step + AND dependency.flow_slug = v_run_record.flow_slug + AND parent_step.step_type = 'single' -- Only validate single steps + AND child_step.step_type = 'map' + AND child_state.run_id = complete_task.run_id + AND child_state.initial_tasks IS NULL + AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') +LIMIT 1; + +-- Handle type violation if detected +IF v_dependent_map_slug IS NOT NULL THEN + -- Mark run as failed immediately + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = complete_task.run_id; + + -- Archive all active messages (both queued and started) to prevent orphaned messages + PERFORM pgmq.archive( + v_run_record.flow_slug, + array_agg(st.message_id) + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING count(*) > 0; -- Only call archive if there are messages to archive + + -- Mark current task as failed and store the output + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + output = complete_task.output, -- Store the output that caused the violation + error_message = '[TYPE_VIOLATION] Produced ' || + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END || + ' instead of array' + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + + -- Mark step state as failed + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug; + + -- Archive the current task's message (it was started, now failed) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id -- Single message, use scalar form + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + + -- Return empty result + RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + RETURN; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Update task and propagate changes +-- ========================================== +WITH +-- ---------- Task completion ---------- +-- Update the task record with completion status and output +task AS ( + UPDATE pgflow.step_tasks + SET + status = 'completed', + completed_at = now(), + output = complete_task.output + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index + AND pgflow.step_tasks.status = 'started' + RETURNING * +), +-- ---------- Step state update ---------- +-- Decrement remaining_tasks and potentially mark step as completed +step_state AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement + ELSE 'started' + END, + completed_at = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement + ELSE NULL + END, + remaining_tasks = pgflow.step_states.remaining_tasks - 1 + FROM task + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Dependency resolution ---------- +-- Find all child steps that depend on the completed parent step (only if parent completed) +child_steps AS ( + SELECT deps.step_slug AS child_step_slug + FROM pgflow.deps deps + JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug + WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child + ORDER BY deps.step_slug -- Ensure consistent ordering +), +-- ---------- Lock child steps ---------- +-- Acquire locks on all child steps before updating them +child_steps_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps) + FOR UPDATE +), +-- ---------- Update child steps ---------- +-- Decrement remaining_deps and resolve NULL initial_tasks for map steps +child_steps_update AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- Resolve NULL initial_tasks for child map steps + -- This is where child maps learn their array size from the parent + -- This CTE only runs when the parent step is complete (see child_steps JOIN) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN + CASE + WHEN parent_step.step_type = 'map' THEN + -- Map->map: Count all completed tasks from parent map + -- We add 1 because the current task is being completed in this transaction + -- but isn't yet visible as 'completed' in the step_tasks table + -- TODO: Refactor to use future column step_states.total_tasks + -- Would eliminate the COUNT query and just use parent_state.total_tasks + (SELECT COUNT(*)::int + 1 + FROM pgflow.step_tasks parent_tasks + WHERE parent_tasks.run_id = complete_task.run_id + AND parent_tasks.step_slug = complete_task.step_slug + AND parent_tasks.status = 'completed' + AND parent_tasks.task_index != complete_task.task_index) + ELSE + -- Single->map: Use output array length (single steps complete immediately) + CASE + WHEN complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE NULL -- Keep NULL if not an array + END + END + ELSE child_state.initial_tasks -- Keep existing value (including NULL) + END + FROM child_steps children + JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND child_step.step_slug = children.child_step_slug + JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_slug = complete_task.step_slug + WHERE child_state.run_id = complete_task.run_id + AND child_state.step_slug = children.child_step_slug +) +-- ---------- Update run remaining_steps ---------- +-- Decrement the run's remaining_steps counter if step completed +UPDATE pgflow.runs +SET remaining_steps = pgflow.runs.remaining_steps - 1 +FROM step_state +WHERE pgflow.runs.run_id = complete_task.run_id + AND step_state.status = 'completed'; + +-- ========================================== +-- POST-COMPLETION ACTIONS +-- ========================================== + +-- ---------- Get updated state for broadcasting ---------- +SELECT * INTO v_step_state FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + +-- ---------- Handle step completion ---------- +IF v_step_state.status = 'completed' THEN + -- Cascade complete any taskless steps that are now ready + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); + + -- Broadcast step:completed event + -- For map steps, aggregate all task outputs; for single steps, use the task output + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'completed', + 'output', CASE + WHEN (SELECT s.step_type FROM pgflow.steps s + WHERE s.flow_slug = v_step_state.flow_slug + AND s.step_slug = complete_task.step_slug) = 'map' THEN + -- Aggregate all task outputs for map steps + (SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.status = 'completed') + ELSE + -- Single step: use the individual task output + complete_task.output + END, + 'completed_at', v_step_state.completed_at + ), + concat('step:', complete_task.step_slug, ':completed'), + concat('pgflow:run:', complete_task.run_id), + false + ); +END IF; + +-- ---------- Archive completed task message ---------- +-- Move message from active queue to archive table +PERFORM ( + WITH completed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.status = 'completed' + ) + SELECT pgmq.archive(ct.flow_slug, ct.message_id) + FROM completed_tasks ct + WHERE EXISTS (SELECT 1 FROM completed_tasks) +); + +-- ---------- Trigger next steps ---------- +-- Start any steps that are now ready (deps satisfied) +PERFORM pgflow.start_ready_steps(complete_task.run_id); + +-- Check if the entire run is complete +PERFORM pgflow.maybe_complete_run(complete_task.run_id); + +-- ---------- Return completed task ---------- +RETURN QUERY SELECT * +FROM pgflow.step_tasks AS step_task +WHERE step_task.run_id = complete_task.run_id + AND step_task.step_slug = complete_task.step_slug + AND step_task.task_index = complete_task.task_index; + +end; +$$; +-- Modify "fail_task" function +CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_run_failed boolean; + v_step_failed boolean; +begin + +-- If run is already failed, no retries allowed +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + error_message = fail_task.error_message + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index + AND pgflow.step_tasks.status = 'started'; + + -- Archive the task's message + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; +END IF; + +WITH run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = fail_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + FOR UPDATE +), +flow_info AS ( + SELECT r.flow_slug + FROM pgflow.runs r + WHERE r.run_id = fail_task.run_id +), +config AS ( + SELECT + COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts, + COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN flow_info fi ON fi.flow_slug = s.flow_slug + WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug +), +fail_or_retry_task as ( + UPDATE pgflow.step_tasks as task + SET + status = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued' + ELSE 'failed' + END, + failed_at = CASE + WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now() + ELSE NULL + END, + started_at = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL + ELSE task.started_at + END, + error_message = fail_task.error_message + WHERE task.run_id = fail_task.run_id + AND task.step_slug = fail_task.step_slug + AND task.task_index = fail_task.task_index + AND task.status = 'started' + RETURNING * +), +maybe_fail_step AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed' + ELSE pgflow.step_states.status + END, + failed_at = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now() + ELSE NULL + END, + error_message = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message + ELSE NULL + END + FROM fail_or_retry_task + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + RETURNING pgflow.step_states.* +) +-- Update run status +UPDATE pgflow.runs +SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END +WHERE pgflow.runs.run_id = fail_task.run_id +RETURNING (status = 'failed') INTO v_run_failed; + +-- Check if step failed by querying the step_states table +SELECT (status = 'failed') INTO v_step_failed +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Send broadcast event for step failure if the step was failed +IF v_step_failed THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + concat('step:', fail_task.step_slug, ':failed'), + concat('pgflow:run:', fail_task.run_id), + false + ); +END IF; + +-- Send broadcast event for run failure if the run was failed +IF v_run_failed THEN + DECLARE + v_flow_slug text; + BEGIN + SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id; + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', fail_task.run_id, + 'flow_slug', v_flow_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', fail_task.run_id), + false + ); + END; +END IF; + +-- Archive all active messages (both queued and started) when run fails +IF v_run_failed THEN + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; +END IF; + +-- For queued tasks: delay the message for retry with exponential backoff +PERFORM ( + WITH retry_config AS ( + SELECT + COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN pgflow.runs r ON r.flow_slug = f.flow_slug + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug + ), + queued_tasks AS ( + SELECT + r.flow_slug, + st.message_id, + pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'queued' + ) + SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay) + FROM queued_tasks qt + WHERE EXISTS (SELECT 1 FROM queued_tasks) +); + +-- For failed tasks: archive the message +PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) +FROM pgflow.step_tasks st +JOIN pgflow.runs r ON st.run_id = r.run_id +WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'failed' + AND st.message_id IS NOT NULL +GROUP BY r.flow_slug +HAVING COUNT(st.message_id) > 0; + +return query select * +from pgflow.step_tasks st +where st.run_id = fail_task.run_id + and st.step_slug = fail_task.step_slug + and st.task_index = fail_task.task_index; + +end; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 7f21a5295..229f7ae81 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:L0B5nrLTUufGJ0mMcVcnPjHjuvhPMDMHvOv4rgEnycI= +h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= @@ -15,3 +15,4 @@ h1:L0B5nrLTUufGJ0mMcVcnPjHjuvhPMDMHvOv4rgEnycI= 20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y= 20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4= 20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60= +20250919101802_pgflow_temp_orphaned_messages_index.sql h1:GyfPfQz4AqB1/sTAC7B/m6j8FJrpkocinnzerNfM0f8= diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/no_cascade_on_failed_run.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/no_cascade_on_failed_run.test.sql new file mode 100644 index 000000000..683f0f0d6 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/no_cascade_on_failed_run.test.sql @@ -0,0 +1,44 @@ +-- Test that cascade_complete_taskless_steps returns 0 and does not cascade when run is failed +begin; + +select plan(2); + +-- Create test flow with cascade of taskless steps +select pgflow.create_flow('test_flow', max_attempts => 1); +select pgflow.add_step('test_flow', 'step1'); +select pgflow.add_step('test_flow', 'step2', deps_slugs => ARRAY['step1']); + +-- Start a flow run +select pgflow.start_flow('test_flow', '{"test": "data"}'::jsonb); + +-- Get the run_id +select run_id from pgflow.runs where flow_slug = 'test_flow' limit 1 \gset + +-- Get message ID for step1 +select message_id as msg1 from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' limit 1 \gset + +-- Ensure worker exists +select pgflow_tests.ensure_worker('test_flow'); + +-- Start and fail step1 which will fail the entire run +select pgflow.start_tasks('test_flow', ARRAY[:msg1]::bigint[], '11111111-1111-1111-1111-111111111111'::uuid); +select pgflow.fail_task(:'run_id', 'step1', 0, 'Simulated failure'); + +-- Call cascade_complete_taskless_steps directly on the failed run +-- It should return 0 without doing any cascading +select is( + pgflow.cascade_complete_taskless_steps(:'run_id'), + 0, + 'cascade_complete_taskless_steps should return 0 when run is failed' +); + +-- Verify run remains failed +select is( + status, + 'failed', + 'Run should remain in failed status' +) from pgflow.runs where run_id = :'run_id'; + +select finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/complete_task/no_mutations_on_failed_run.test.sql b/pkgs/core/supabase/tests/complete_task/no_mutations_on_failed_run.test.sql new file mode 100644 index 000000000..f380602d8 --- /dev/null +++ b/pkgs/core/supabase/tests/complete_task/no_mutations_on_failed_run.test.sql @@ -0,0 +1,85 @@ +-- Test that complete_task does not mutate state when run is already failed +-- This simulates a race condition where a task tries to complete after the run has failed +begin; + +select plan(6); + +-- Create test flow with two parallel steps (one will fail, one will try to complete) +select pgflow.create_flow('test_flow', max_attempts => 1); +select pgflow.add_step('test_flow', 'step1'); +select pgflow.add_step('test_flow', 'step2'); + +-- Start a flow run +select pgflow.start_flow('test_flow', '{"test": "data"}'::jsonb); + +-- Get the run_id +select run_id from pgflow.runs where flow_slug = 'test_flow' limit 1 \gset + +-- Get message IDs for both steps +select message_id as msg1 from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' limit 1 \gset + +select message_id as msg2 from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step2' limit 1 \gset + +-- Ensure worker exists +select pgflow_tests.ensure_worker('test_flow'); + +-- Start both tasks (simulating workers picking them up) +select pgflow.start_tasks('test_flow', ARRAY[:msg1, :msg2]::bigint[], '11111111-1111-1111-1111-111111111111'::uuid); + +-- Fail step2 which will fail the entire run (max_attempts=1) +select pgflow.fail_task(:'run_id', 'step2', 0, 'Simulated failure'); + +-- Now try to complete step1 (race condition - worker doesn't know run failed) +select pgflow.complete_task(:'run_id', 'step1', 0, '{"result": "test"}'::jsonb); + +-- Verify task was NOT marked as completed +select is( + status, + 'started', + 'Task should remain in started status when run is failed' +) from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' and task_index = 0; + +-- Verify output was NOT saved +select is( + output, + null, + 'Task output should not be saved when run is failed' +) from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' and task_index = 0; + +-- Verify step state was NOT changed to completed +select is( + status, + 'started', + 'Step should remain in started status when run is failed' +) from pgflow.step_states +where run_id = :'run_id' and step_slug = 'step1'; + +-- Verify step2 is failed +select is( + status, + 'failed', + 'Step2 should be in failed status' +) from pgflow.step_states +where run_id = :'run_id' and step_slug = 'step2'; + +-- Verify run is failed +select is( + status, + 'failed', + 'Run should be in failed status' +) from pgflow.runs where run_id = :'run_id'; + +-- Verify step1 remains started (not completed) +select is( + attempts_count, + 1, + 'Step1 attempts_count should be 1 after start_tasks' +) from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' and task_index = 0; + +select finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/fail_task/archive_sibling_map_tasks.test.sql b/pkgs/core/supabase/tests/fail_task/archive_sibling_map_tasks.test.sql new file mode 100644 index 000000000..660c0465c --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task/archive_sibling_map_tasks.test.sql @@ -0,0 +1,113 @@ +begin; +select plan(8); +select pgflow_tests.reset_db(); + +-- Test: fail_task should archive all sibling task messages for map steps + +-- Create flow with a map step +select pgflow.create_flow('test_map_fail'); +select pgflow.add_step( + flow_slug => 'test_map_fail', + step_slug => 'map_step', + step_type => 'map', + max_attempts => 1 +); + +-- Start flow with 5 array elements +select run_id as test_run_id from pgflow.start_flow('test_map_fail', '["a", "b", "c", "d", "e"]'::jsonb) \gset + +-- Verify all 5 messages are in queue +select is( + (select count(*) from pgmq.q_test_map_fail), + 5::bigint, + 'Should have 5 messages in queue for 5 map tasks' +); + +-- Verify all 5 tasks are created +select is( + (select count(*)::integer from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and status = 'queued'), + 5, + 'Should have 5 queued tasks' +); + +-- Ensure worker exists for polling +select pgflow_tests.ensure_worker('test_map_fail'); + +-- Start task 0 (simulating Edge Worker behavior) +-- Note: read_and_start will start one of the tasks (we'll use it for testing) +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('test_map_fail', 1, 1) + LIMIT 1 +) +SELECT step_slug FROM task; + +-- Get the actual task_index of the started task for later reference +select task_index as started_task_index from pgflow.step_tasks +where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and status = 'started' \gset + +-- Fail the started task +select pgflow.fail_task( + :'test_run_id'::uuid, + 'map_step', + :'started_task_index'::integer, + 'Task failed!' +); + +-- Test: Run should be marked as failed +select is( + (select status from pgflow.runs where run_id = :'test_run_id'::uuid), + 'failed', + 'Run should be marked as failed after task failure' +); + +-- Test: Failed task should have status 'failed' +select is( + (select status from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and task_index = :'started_task_index'::integer), + 'failed', + 'Started task should be marked as failed' +); + +-- CRITICAL TEST: All sibling task messages should be archived (removed from queue) +select is( + (select count(*) from pgmq.q_test_map_fail), + 0::bigint, + 'All 5 messages should be archived (removed from queue) when one map task fails' +); + +-- Test: Verify messages were actually archived, not deleted +select is( + (select count(*) from pgmq.a_test_map_fail), + 5::bigint, + 'All 5 messages should be in archive table' +); + +-- Test: All sibling tasks should remain in 'queued' status +select is( + (select count(*)::integer from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and task_index != :'started_task_index'::integer + and status = 'queued'), + 4, + 'Sibling tasks should remain in queued status' +); + +-- Test: Step state should be marked as failed +select is( + (select status from pgflow.step_states + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step'), + 'failed', + 'Map step should be marked as failed' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql new file mode 100644 index 000000000..c8f45701f --- /dev/null +++ b/pkgs/core/supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql @@ -0,0 +1,124 @@ +begin; +select plan(8); +select pgflow_tests.reset_db(); + +-- Test: Type constraint violation should archive all pending messages for the run + +-- Create flow with single -> map dependency where map has multiple downstream steps +select pgflow.create_flow('type_constraint_test'); + +select pgflow.add_step( + flow_slug => 'type_constraint_test', + step_slug => 'producer', + step_type => 'single' +); + +select pgflow.add_step( + flow_slug => 'type_constraint_test', + step_slug => 'map_consumer', + deps_slugs => ARRAY['producer'], + step_type => 'map' +); + +-- Add a parallel single step to verify all pending messages are archived +select pgflow.add_step( + flow_slug => 'type_constraint_test', + step_slug => 'parallel_single', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); + +select run_id as test_run_id from pgflow.start_flow( + 'type_constraint_test', + '{}'::jsonb +) \gset + +-- Verify initial state: 1 message in queue (only root producer starts immediately) +select is( + (select count(*) from pgmq.q_type_constraint_test), + 1::bigint, + 'Should have 1 message in queue initially (only producer as root step)' +); + +-- Test: Map starts with NULL initial_tasks +select is( + (select initial_tasks from pgflow.step_states + where run_id = :'test_run_id'::uuid + and step_slug = 'map_consumer'), + NULL::integer, + 'Dependent map should start with NULL initial_tasks' +); + +-- Ensure worker exists for polling +select pgflow_tests.ensure_worker('type_constraint_test'); + +-- Start the producer task (simulating Edge Worker behavior) +-- Since producer is the only root step, it will be the one started +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('type_constraint_test', 1, 1) + LIMIT 1 +) +SELECT step_slug FROM task; + +-- Get the producer task message_id for verification +select message_id as producer_msg_id from pgflow.step_tasks +where run_id = :'test_run_id'::uuid + and step_slug = 'producer' \gset + +-- Test: complete_task should handle type violation gracefully (no exception) +select lives_ok( + format($$ + SELECT pgflow.complete_task( + '%s'::uuid, + 'producer', + 0, + '{"not": "an array"}'::jsonb + ) + $$, :'test_run_id'), + 'complete_task should handle type violation gracefully without throwing exception' +); + +-- CRITICAL TEST: Queue should be empty after type constraint violation (if archiving is implemented) +-- Currently this will fail because archiving is not implemented +select is( + (select count(*) from pgmq.q_type_constraint_test), + 0::bigint, + 'Queue should be empty after type constraint violation (all pending messages archived)' +); + +-- Test: Verify if any messages were archived +-- This will also fail because archiving is not implemented +select ok( + (select count(*) from pgmq.a_type_constraint_test) > 0, + 'Some messages should be in archive table after type constraint violation' +); + +-- Test: Run status after type constraint violation +-- Currently the run won't be marked as failed (transaction rolled back) +select is( + (select status from pgflow.runs where run_id = :'test_run_id'::uuid), + 'failed', + 'Run should be marked as failed after type constraint violation' +); + +-- Test: Map initial_tasks should remain NULL after failed transaction +select is( + (select initial_tasks from pgflow.step_states + where run_id = :'test_run_id'::uuid + and step_slug = 'map_consumer'), + NULL, + 'Map initial_tasks should remain NULL after type constraint violation' +); + +-- Test: Check if parallel_single task exists +-- After the transaction rollback, parallel_single won't be created +select is( + (select count(*)::integer from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'parallel_single'), + 0, + 'Parallel single task should not exist after type constraint violation (transaction rolled back)' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql index 954a94d74..ac18b076e 100644 --- a/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql +++ b/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql @@ -1,5 +1,5 @@ begin; -select plan(3); +select plan(8); select pgflow_tests.reset_db(); -- Test: Non-array output to dependent map should fail the run @@ -42,8 +42,8 @@ WITH task AS ( ) SELECT step_slug FROM task; --- Test: complete_task should RAISE EXCEPTION for non-array output to map -select throws_ok( +-- Test: complete_task should handle type violation gracefully (no exception) +select lives_ok( $$ WITH task_info AS ( SELECT run_id, step_slug, task_index @@ -59,9 +59,47 @@ select throws_ok( '{"not": "an array"}'::jsonb ) FROM task_info $$, - 'P0001', -- RAISE EXCEPTION error code - 'Map step map_consumer expects array input but dependency producer produced object (output: {"not": "an array"})', - 'complete_task should fail when non-array is passed to dependent map' + 'complete_task should handle type violation gracefully without throwing exception' +); + +-- Test: Producer task should be marked as failed with error message +select is( + (select status from pgflow.step_tasks + where flow_slug = 'non_array_test' and step_slug = 'producer'), + 'failed'::text, + 'Producer task should be marked as failed' +); + +-- Test: Producer task should have appropriate error message +select ok( + (select error_message ILIKE '%TYPE_VIOLATION%' from pgflow.step_tasks + where flow_slug = 'non_array_test' and step_slug = 'producer'), + 'Producer task should have type constraint error message' +); + +-- Test: Producer task should store the invalid output despite failing +select is( + (select output from pgflow.step_tasks + where flow_slug = 'non_array_test' and step_slug = 'producer'), + '{"not": "an array"}'::jsonb, + 'Producer task should store the output that caused the type violation' +); + +-- Test: Run should be marked as failed +select is( + (select status from pgflow.runs + where flow_slug = 'non_array_test' limit 1), + 'failed'::text, + 'Run should be marked as failed after type constraint violation' +); + +-- Test: Messages should be archived +select cmp_ok( + (select count(*) from pgmq.a_non_array_test + where message->>'step_slug' = 'producer'), + '>', + 0::bigint, + 'Failed task messages should be archived' ); -- Test: Map initial_tasks should remain NULL after failed transaction diff --git a/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql index d1c045265..ddc24993c 100644 --- a/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql +++ b/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql @@ -1,5 +1,5 @@ begin; -select plan(3); +select plan(7); select pgflow_tests.reset_db(); -- Test: NULL output to dependent map should fail @@ -42,8 +42,8 @@ WITH task AS ( ) SELECT step_slug FROM task; --- Test: complete_task should RAISE EXCEPTION for NULL output to map -select throws_ilike( +-- Test: complete_task should handle NULL output gracefully (no exception) +select lives_ok( $$ WITH task_info AS ( SELECT run_id, step_slug, task_index @@ -59,8 +59,38 @@ select throws_ilike( NULL::jsonb -- Passing literal NULL! ) FROM task_info $$, - '%Map step map_consumer expects array input but dependency producer produced null%', - 'complete_task should fail when NULL is passed to dependent map' + 'complete_task should handle NULL output gracefully without throwing exception' +); + +-- Test: Producer task should be marked as failed +select is( + (select status from pgflow.step_tasks + where flow_slug = 'null_output_test' and step_slug = 'producer'), + 'failed'::text, + 'Producer task should be marked as failed' +); + +-- Test: Producer task should have appropriate error message +select ok( + (select error_message ILIKE '%TYPE_VIOLATION%' from pgflow.step_tasks + where flow_slug = 'null_output_test' and step_slug = 'producer'), + 'Producer task should have type constraint error message' +); + +-- Test: Producer task should store NULL output +select is( + (select output from pgflow.step_tasks + where flow_slug = 'null_output_test' and step_slug = 'producer'), + NULL::jsonb, + 'Producer task should store the NULL output that caused the type violation' +); + +-- Test: Run should be marked as failed +select is( + (select status from pgflow.runs + where flow_slug = 'null_output_test' limit 1), + 'failed'::text, + 'Run should be marked as failed after type constraint violation' ); -- Test: Map initial_tasks should remain NULL after failed transaction diff --git a/pkgs/core/supabase/tests/type_violations/archive_all_messages.test.sql b/pkgs/core/supabase/tests/type_violations/archive_all_messages.test.sql new file mode 100644 index 000000000..28959b046 --- /dev/null +++ b/pkgs/core/supabase/tests/type_violations/archive_all_messages.test.sql @@ -0,0 +1,179 @@ +begin; +select plan(10); +select pgflow_tests.reset_db(); + +-- Test: Both queued AND started messages are archived on type violation +-- This prevents orphaned messages cycling through workers + +-- Create flow with parallel branches to have multiple tasks +select pgflow.create_flow('archive_test'); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'producer', + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'branch1', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'branch2', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'branch3', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); +-- This map step expects arrays from branch1 +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'consumer_map', + deps_slugs => ARRAY['branch1'], + step_type => 'map' +); + +-- Start flow +select run_id as test_run_id from pgflow.start_flow('archive_test', '{}') \gset + +-- Start producer task +select pgflow_tests.ensure_worker('archive_test', '11111111-1111-1111-1111-111111111111'::uuid); +SELECT * FROM pgflow_tests.read_and_start('archive_test', 1, 1) LIMIT 1; + +-- Complete producer to spawn branches +SELECT pgflow.complete_task(:'test_run_id'::uuid, 'producer', 0, '{"data": "test"}'::jsonb); + +-- Start some branch tasks to have a mix of queued and started +select pgflow_tests.ensure_worker('archive_test', 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'::uuid, 'worker_a'); +SELECT * FROM pgflow_tests.read_and_start('archive_test', 1, 1, + 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'::uuid, 'worker_a') LIMIT 1; + +select pgflow_tests.ensure_worker('archive_test', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'::uuid, 'worker_b'); +SELECT * FROM pgflow_tests.read_and_start('archive_test', 1, 1, + 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'::uuid, 'worker_b') LIMIT 1; + +-- Count messages in queue before violation +select count(*) as queue_before from pgmq.q_archive_test \gset +select count(*) as archive_before from pgmq.a_archive_test \gset + +-- Check we have both queued and started tasks with messages +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and status = 'queued' and message_id is not null), + true, + 'Should have queued tasks with messages before violation' +); + +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and status = 'started' and message_id is not null), + true, + 'Should have started tasks with messages before violation' +); + +-- Trigger type violation by completing branch1 with non-array (consumer_map expects array) +select lives_ok( + format($$ + SELECT pgflow.complete_task('%s'::uuid, 'branch1', 0, '{"not": "an array"}'::jsonb) + $$, :'test_run_id'), + 'complete_task should handle type violation' +); + +-- CRITICAL: All messages should be archived (both from queued AND started tasks) +select is( + (select count(*) from pgmq.q_archive_test), + 0::bigint, + 'Queue should be empty - ALL messages archived (not just queued ones)' +); + +-- Verify messages were moved to archive, not lost +select ok( + (select count(*) from pgmq.a_archive_test) > :archive_before::bigint, + 'Messages should be in archive table' +); + +-- SECOND TEST: fail_task also archives all messages +-- Reset for second test +select pgflow_tests.reset_db(); + +-- Create flow with max_attempts=1 to fail immediately +select pgflow.create_flow('fail_test', max_attempts => 1); +select pgflow.add_step( + flow_slug => 'fail_test', + step_slug => 'task1', + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'fail_test', + step_slug => 'task2', + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'fail_test', + step_slug => 'task3', + step_type => 'single' +); + +-- Start flow +select run_id as test_run_id2 from pgflow.start_flow('fail_test', '{}') \gset + +-- Start some tasks to have mix of queued and started +select pgflow_tests.ensure_worker('fail_test', 'cccccccc-cccc-cccc-cccc-cccccccccccc'::uuid, 'worker_c'); +SELECT * FROM pgflow_tests.read_and_start('fail_test', 1, 1, + 'cccccccc-cccc-cccc-cccc-cccccccccccc'::uuid, 'worker_c') LIMIT 1; + +select pgflow_tests.ensure_worker('fail_test', 'dddddddd-dddd-dddd-dddd-dddddddddddd'::uuid, 'worker_d'); +SELECT * FROM pgflow_tests.read_and_start('fail_test', 1, 1, + 'dddddddd-dddd-dddd-dddd-dddddddddddd'::uuid, 'worker_d') LIMIT 1; + +-- Count messages before failure +select count(*) as queue_before2 from pgmq.q_fail_test \gset + +-- Check we have both queued and started +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id2'::uuid + and status = 'queued' and message_id is not null), + true, + 'fail_test should have queued tasks with messages' +); + +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id2'::uuid + and status = 'started' and message_id is not null), + true, + 'fail_test should have started tasks with messages' +); + +-- Fail one task (with max_attempts=1, it will fail the run) +select lives_ok( + format($$ + SELECT pgflow.fail_task('%s'::uuid, 'task1', 0, 'Test error') + $$, :'test_run_id2'), + 'fail_task should handle run failure' +); + +-- CRITICAL: All messages should be archived +select is( + (select count(*) from pgmq.q_fail_test), + 0::bigint, + 'fail_task: Queue should be empty - ALL messages archived' +); + +-- Verify run is failed +select is( + (select status from pgflow.runs where run_id = :'test_run_id2'::uuid), + 'failed', + 'fail_task: Run should be marked as failed' +); + +select * from finish(); +rollback; \ No newline at end of file