From 237c69fd439e2824e094a91a24cb6e6f94e9dcc0 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Thu, 18 Sep 2025 17:34:28 +0200 Subject: [PATCH] docs: add instructions for fixing SQL tests and updating functions Provides guidance on fixing invalid tests, updating SQL functions, and rerunning tests without creating migrations or using nx, to streamline test maintenance and debugging. --- .changeset/hungry-cloths-hunt.md | 12 + .claude/commands/fix-sql-tests.md | 17 + .claude/commands/test-first-sql.md | 40 + PLAN.md | 72 +- PLAN_orphaned_messages.md | 184 ----- PLAN_step_output.md | 15 +- pkgs/core/PLAN_race_condition_testing.md | 176 +++++ pkgs/core/README.md | 8 +- pkgs/core/schemas/0060_tables_runtime.sql | 2 +- .../schemas/0100_function_complete_task.sql | 104 ++- pkgs/core/schemas/0100_function_fail_task.sql | 65 +- .../0100_function_start_ready_steps.sql | 11 +- ...02_pgflow_temp_orphaned_messages_index.sql | 688 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../no_cascade_on_failed_run.test.sql | 44 ++ .../no_mutations_on_failed_run.test.sql | 85 +++ .../archive_sibling_map_tasks.test.sql | 113 +++ ...ssages_on_type_constraint_failure.test.sql | 124 ++++ .../non_array_to_map_should_fail.test.sql | 50 +- .../null_output_to_map_should_fail.test.sql | 40 +- .../archive_all_messages.test.sql | 179 +++++ 21 files changed, 1760 insertions(+), 272 deletions(-) create mode 100644 .changeset/hungry-cloths-hunt.md create mode 100644 .claude/commands/fix-sql-tests.md create mode 100644 .claude/commands/test-first-sql.md delete mode 100644 PLAN_orphaned_messages.md create mode 100644 pkgs/core/PLAN_race_condition_testing.md create mode 100644 pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql create mode 100644 pkgs/core/supabase/tests/cascade_complete_taskless_steps/no_cascade_on_failed_run.test.sql create mode 100644 pkgs/core/supabase/tests/complete_task/no_mutations_on_failed_run.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task/archive_sibling_map_tasks.test.sql create mode 100644 pkgs/core/supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql create mode 100644 pkgs/core/supabase/tests/type_violations/archive_all_messages.test.sql diff --git a/.changeset/hungry-cloths-hunt.md b/.changeset/hungry-cloths-hunt.md new file mode 100644 index 000000000..38cc06f65 --- /dev/null +++ b/.changeset/hungry-cloths-hunt.md @@ -0,0 +1,12 @@ +--- +'@pgflow/core': patch +--- + +Improve failure handling and prevent orphaned messages in queue + +- Archive all queued messages when a run fails to prevent resource waste +- Handle type constraint violations gracefully without exceptions +- Store output on failed tasks (including type violations) for debugging +- Add performance index for efficient message archiving +- Prevent retries on already-failed runs +- Update table constraint to allow output storage on failed tasks diff --git a/.claude/commands/fix-sql-tests.md b/.claude/commands/fix-sql-tests.md new file mode 100644 index 000000000..b207116ad --- /dev/null +++ b/.claude/commands/fix-sql-tests.md @@ -0,0 +1,17 @@ +Your job is to fix SQL tests, either by fixing the tests if those are invalid, +or updating the SQL functions in pkgs/core/schemas/ and trying again. + +If updating functions, load them with psql. + +!`pnpm nx supabase:status core --output env | grep DB_URL` +PWD: !`pwd` + +To rerun the test(s), run this command from `pkgs/core` directory: + +`scripts/run-test-with-colors supabase/tests/` + +Do not create any migratons or try to run tests with nx. + + +!`pnpm nx test:pgtap core` + diff --git a/.claude/commands/test-first-sql.md b/.claude/commands/test-first-sql.md new file mode 100644 index 000000000..470a9a627 --- /dev/null +++ b/.claude/commands/test-first-sql.md @@ -0,0 +1,40 @@ +Your job is to implement the feature below in a test-first manner. +First, you must idenfity what things you want to test for. +Then you must write one test at a time, from the simplest, more generic, +to more precise (if applicable, sometimes you only need to write one test per +thing, without multiple per thing). + +To run the test(s), run this command from `pkgs/core` directory: + +`scripts/run-test-with-colors supabase/tests/` + +The newly written test must fail for the correct reasons. + +In order to make the test pass, you need to update function +code in pkgs/core/schemas/. + +After updating you should use `psql` to execute function file +and update function in database. + +!`pnpm nx supabase:status core --output env | grep DB_URL` +PWD: !`pwd` + +Repeat until all the added tests are passing. + +When they do, run all the tests like this: + +`scripts/run-test-with-colors supabase/tests/` + +Do not create any migratons or try to run tests with nx. + +Never use any INSERTs or UPDATEs to prepare or mutate state for the test. +Instead, use regular pgflow.\* SQL functions or functions that are +available in pkgs/core/supabase/tests/seed.sql: + +!`grep 'function.*pgflow_tests' pkgs/core/supabase/seed.sql -A7` + +Check how they are used in other tests. + + +$ARGUMENTS + diff --git a/PLAN.md b/PLAN.md index a4c74d8f9..3043d701f 100644 --- a/PLAN.md +++ b/PLAN.md @@ -10,7 +10,7 @@ - ✅ **DONE**: Array element extraction - tasks receive individual array elements - ✅ **DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents - ✅ **DONE**: DSL support for `.map()` for defining map steps with compile-time duplicate detection -- ⏳ **TODO**: Fix orphaned messages on run failure +- ✅ **DONE**: Fix orphaned messages on run failure - ⏳ **TODO**: Performance optimization with step_states.output column ### Chores @@ -106,18 +106,28 @@ - Updated DSL README with .map() documentation - Created detailed changeset +- [x] **PR #219: Fix Orphaned Messages on Run Failure** - `09-18-fix-orphaned-messages-on-fail` ✅ COMPLETED + + - Archives all queued messages when run fails (prevents orphaned messages) + - Handles type constraint violations gracefully without exceptions + - Added guards to prevent any mutations on failed runs: + - complete_task returns unchanged + - start_ready_steps exits early + - cascade_complete_taskless_steps returns 0 + - Added performance index for efficient message archiving + - Tests unstashed and passing (archive_sibling_map_tasks, archive_messages_on_type_constraint_failure) + - Updated core README with failure handling mentions + - **Critical fix: prevents queue performance degradation in production** + #### ❌ Remaining Work (Priority Order) -- [ ] **Priority 1: Fix Orphaned Messages on Run Failure** 🚨 CRITICAL +- [ ] **Integration Tests** - - Archive all pending messages when run fails - - Handle map sibling tasks specially - - Fix type constraint violations to fail immediately without retries - - See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md) - - **Critical for production: prevents queue performance degradation** - - Tests already written (stashed) that document the problem + - End-to-end workflows with real array data + - Basic happy path coverage + - This should be minimal and added to the Edge Worker integration test suite for now -- [ ] **Priority 2: Performance Optimization - step_states.output Column** +- [ ] **Performance Optimization - step_states.output Column** - Migrate from inline aggregation to storing outputs in step_states - See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md) @@ -132,43 +142,33 @@ - Update all aggregation tests (~17 files) - **Note**: This is an optimization that should be done after core functionality is stable -- [ ] **Priority 3: Integration Tests** - - - End-to-end workflows with real array data - - Basic happy path coverage - - This should be minimal and added to the Edge Worker integration test suite for now - -- [ ] **Priority 4: Update core README** - - - `pkgs/core/README.md` - - - Add new section describing the step types - - Describe single step briefly, focus on describing map step type and how it differs - - Make sure to mention that maps are constrained to have exactly one dependency - - Show multiple cases of inputs -> task creation - - Explain edge cases (empty array propagation, invalid array input) - - Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input - - Explain cascade completion of taskless steps and its limitations +- [ ] **Update `pkgs/core/README.md`** -- [ ] **Priority 5: Add docs page** + - Add new section describing the step types + - Describe single step briefly, focus on describing map step type and how it differs + - Make sure to mention that maps are constrained to have exactly one dependency + - Show multiple cases of inputs -> task creation + - Explain edge cases (empty array propagation, invalid array input) + - Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input + - Explain cascade completion of taskless steps and its limitations - - **Add basic docs page** +- [ ] **Add docs page** - - put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx` - - describe the DSL and how the map works and why we need it - - show example usage of root map - - show example usage of dependent map - - focus mostly on how to use it, instead of how it works under the hood - - link to the README's for more details + - put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx` + - describe the DSL and how the map works and why we need it + - show example usage of root map + - show example usage of dependent map + - focus mostly on how to use it, instead of how it works under the hood + - link to the README's for more details -- [ ] **Priority 6: Migration Consolidation** (Do this last before merge!) +- [ ] **Migration Consolidation** - Remove all temporary/incremental migrations from feature branches - Generate a single consolidated migration for the entire map infrastructure - Ensure clean migration path from current production schema - If NULL improvement is done, include it in the consolidated migration -- [ ] **Priority 7: Graphite Stack Merge** +- [ ] **Graphite Stack Merge** - Configure Graphite merge queue for the complete PR stack - Ensure all PRs in sequence can be merged together diff --git a/PLAN_orphaned_messages.md b/PLAN_orphaned_messages.md deleted file mode 100644 index 9b8e074b0..000000000 --- a/PLAN_orphaned_messages.md +++ /dev/null @@ -1,184 +0,0 @@ -# Plan: Fix Orphaned Messages on Run Failure - -## Problem Statement - -When a run fails, messages for pending tasks remain in the queue indefinitely, causing: -1. **Resource waste**: Workers continuously poll orphaned messages -2. **Performance degradation**: Queue operations slow down over time -3. **Map step issues**: Failing one map task leaves N-1 sibling messages orphaned -4. **Type violations**: Deterministic errors retry unnecessarily - -## Current Behavior - -### When fail_task is called -```sql --- Only archives the single failing task's message -SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id); --- Leaves all other queued messages orphaned -``` - -### When type constraint violation occurs -```sql --- Raises exception, causes retries -RAISE EXCEPTION 'Map step % expects array input...'; --- Transaction rolls back, but retries will hit same error -``` - -## Implementation Plan - -### 1. Update fail_task Function -**File**: `pkgs/core/schemas/0100_function_fail_task.sql` - -Add after marking run as failed (around line 47): -```sql --- Archive all pending messages for this run -WITH tasks_to_archive AS ( - SELECT t.msg_id - FROM pgflow.step_tasks t - WHERE t.run_id = fail_task.run_id - AND t.status = 'pending' - AND t.msg_id IS NOT NULL -) -SELECT pgmq.archive('pgflow_tasks_queue', msg_id) -FROM tasks_to_archive; -``` - -### 2. Update complete_task for Type Violations -**File**: `pkgs/core/schemas/0100_function_complete_task.sql` - -Replace the current RAISE EXCEPTION block (lines 115-120) with: -```sql -IF v_dependent_map_slug IS NOT NULL THEN - -- Mark run as failed immediately (no retries for type violations) - UPDATE pgflow.runs - SET status = 'failed', - failed_at = now(), - error = format('Type contract violation: Map step %s expects array input but dependency %s produced %s (output: %s)', - v_dependent_map_slug, - complete_task.step_slug, - CASE WHEN complete_task.output IS NULL THEN 'null' - ELSE jsonb_typeof(complete_task.output) END, - complete_task.output) - WHERE run_id = complete_task.run_id; - - -- Archive ALL pending messages for this run - PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id) - FROM pgflow.step_tasks t - WHERE t.run_id = complete_task.run_id - AND t.status = 'pending' - AND t.msg_id IS NOT NULL; - - -- Mark the current task as failed (not completed) - UPDATE pgflow.step_tasks - SET status = 'failed', - failed_at = now(), - error_message = format('Type contract violation: produced %s instead of array', - CASE WHEN complete_task.output IS NULL THEN 'null' - ELSE jsonb_typeof(complete_task.output) END) - WHERE run_id = complete_task.run_id - AND step_slug = complete_task.step_slug - AND task_index = complete_task.task_index; - - -- Return empty result set (task not completed) - RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; - RETURN; -END IF; -``` - -### 3. Add Supporting Index -**File**: New migration or add to existing - -```sql --- Speed up the archiving query -CREATE INDEX IF NOT EXISTS idx_step_tasks_pending_with_msg -ON pgflow.step_tasks(run_id, status) -WHERE status = 'pending' AND msg_id IS NOT NULL; -``` - -## Testing - -### Tests Already Written (Stashed) - -1. **`supabase/tests/fail_task/archive_sibling_map_tasks.test.sql`** - - Verifies all map task messages are archived when one fails - - Tests: 8 assertions about message archiving and status - -2. **`supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql`** - - Verifies type violations archive all pending messages - - Tests: 8 assertions about queue cleanup and run status - -### How to Run Tests -```bash -# After unstashing and implementing the fixes: -pnpm nx test:pgtap core -- supabase/tests/fail_task/archive_sibling_map_tasks.test.sql -pnpm nx test:pgtap core -- supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql -``` - -## Migration Considerations - -### Backward Compatibility -- New behavior only affects failed runs (safe) -- Archiving preserves messages (can be recovered if needed) -- No schema changes to existing tables - -### Performance Impact -- One-time cost during failure (acceptable) -- Prevents ongoing performance degradation (improvement) -- Index ensures archiving query is efficient - -### Rollback Plan -If issues arise: -1. Remove the archiving logic -2. Messages remain in queue (old behavior) -3. No data loss since we archive, not delete - -## Edge Cases to Consider - -### 1. Concurrent Task Completion -If multiple tasks complete/fail simultaneously: -- PostgreSQL row locks ensure consistency -- Each failure archives all pending messages -- Idempotent: archiving already-archived messages is safe - -### 2. Very Large Map Steps -For maps with 1000+ tasks: -- Archiving might take several seconds -- Consider batching if performance issues arise -- Current approach should handle up to ~10k tasks reasonably - -### 3. Mixed Step Types -When run has both map and single steps: -- Archive logic handles all pending tasks regardless of type -- Correctly archives both map siblings and unrelated pending tasks - -## Future Enhancements (Not for this PR) - -1. **Selective Archiving**: Only archive tasks that can't proceed -2. **Batch Operations**: Archive in chunks for very large runs -3. **Recovery Mechanism**: Function to unarchive and retry -4. **Monitoring**: Track archived message counts for alerting - -## Success Criteria - -- [ ] All tests pass (both new test files) -- [ ] No orphaned messages after run failure -- [ ] Type violations don't retry -- [ ] Performance acceptable for maps with 100+ tasks -- [ ] No impact on successful run performance - -## Implementation Checklist - -- [ ] Update `fail_task` function -- [ ] Update `complete_task` function -- [ ] Add database index -- [ ] Unstash and run tests -- [ ] Test with large map steps (100+ tasks) -- [ ] Update migration file -- [ ] Document behavior change in function comments - -## Notes - -- This fix is **critical for production** - without it, queue performance will degrade over time -- Type violations are **deterministic** - retrying them is always wasteful -- Archiving (vs deleting) preserves debugging capability -- The fix is relatively simple (~30 lines of SQL) but high impact \ No newline at end of file diff --git a/PLAN_step_output.md b/PLAN_step_output.md index b88848a9b..cebaed580 100644 --- a/PLAN_step_output.md +++ b/PLAN_step_output.md @@ -59,6 +59,9 @@ ADD CONSTRAINT output_only_when_completed CHECK ( 2. **Map steps**: Store aggregated array ordered by task_index 3. **Taskless steps**: Store empty array `[]` for map steps, NULL for single steps 4. **Storage location**: Step output belongs at step level, not task level +5. **Failed steps**: Do NOT store output at step level (only completed steps have step output) + - Individual failed tasks still store their output for debugging + - This includes type violation failures where task output is preserved ## Implementation Changes @@ -188,7 +191,8 @@ SET status = 'completed', - Add: Verify output remains NULL until all tasks complete 10. **`tests/map_output_aggregation/failed_task_handling.test.sql`** - - Add: Verify output remains NULL when step fails + - Add: Verify step output remains NULL when step fails + - Add: Verify individual task outputs are still preserved for debugging 11. **`tests/map_output_aggregation/map_initial_tasks_timing.test.sql`** - No changes needed (focuses on timing) @@ -230,6 +234,8 @@ SET status = 'completed', ```sql -- Verify output is NULL for non-completed steps -- Check constraint prevents setting output on non-completed steps + -- Verify failed steps have NULL output at step level + -- Verify failed tasks still preserve their output for debugging ``` 4. **`tests/step_output/taskless_step_outputs.test.sql`** @@ -244,6 +250,13 @@ SET status = 'completed', -- Map with [null, {data}, null] -> step_states.output has all three ``` +6. **`tests/step_output/failed_task_output_preservation.test.sql`** + ```sql + -- Verify failed tasks store their output (including type violations) + -- Verify step output remains NULL when step fails + -- Test both regular failures and type constraint violations + ``` + #### Tests to Remove/Update 1. **`tests/map_output_aggregation/broadcast_output_verification.test.sql`** diff --git a/pkgs/core/PLAN_race_condition_testing.md b/pkgs/core/PLAN_race_condition_testing.md new file mode 100644 index 000000000..6667e1e30 --- /dev/null +++ b/pkgs/core/PLAN_race_condition_testing.md @@ -0,0 +1,176 @@ +# PLAN: Race Condition Testing for Type Violations + +## Background + +When a type violation occurs (e.g., single step produces non-array for dependent map), the system must archive ALL active messages to prevent orphaned messages that cycle through workers indefinitely. + +## Current Issue + +The fix archives both `'queued'` AND `'started'` tasks, but existing tests don't properly validate the race condition scenarios. + +## Test Scenarios Needed + +### 1. Basic Type Violation (✅ Already Covered) +**Scenario**: Single task causes type violation +``` +step1 (single) → step2 (single) → map_step +``` +- Worker completes step2 with non-array +- Verify run fails and current task's message is archived +- **Coverage**: `non_array_to_map_should_fail.test.sql` + +### 2. Concurrent Started Tasks (❌ Not Covered) +**Scenario**: Multiple workers have tasks in 'started' state when violation occurs +``` +producer (single) → map_consumer (map, expects array) +producer (single) → parallel_task1 (single) +producer (single) → parallel_task2 (single) +``` + +**Test Flow**: +1. Complete producer with `[1, 2, 3]` (spawns 3 map tasks + 2 parallel tasks) +2. Worker A starts `map_consumer[0]` +3. Worker B starts `map_consumer[1]` +4. Worker C starts `parallel_task1` +5. Worker D starts `parallel_task2` +6. Worker C completes `parallel_task1` with non-array (violates some other map dependency) +7. **Verify**: ALL started tasks (map_consumer[0], map_consumer[1], parallel_task2) get archived + +### 3. Mixed Queue States (❌ Not Covered) +**Scenario**: Mix of queued and started tasks across different steps +``` +step1 → step2 → step3 → map_step + ↘ step4 → step5 +``` + +**Test Flow**: +1. Complete step1 +2. Worker A starts step2 +3. Worker B starts step4 +4. Step3 and step5 remain queued +5. Worker A completes step2 with type violation +6. **Verify**: Both started (step4) AND queued (step3, step5) messages archived + +### 4. Map Task Partial Processing (❌ Not Covered) +**Scenario**: Some map tasks started, others queued when violation occurs +``` +producer → large_map (100 elements) +``` + +**Test Flow**: +1. Producer outputs array of 100 elements +2. Workers start processing first 10 tasks +3. 90 tasks remain queued +4. One of the started tasks detects downstream type violation +5. **Verify**: All 100 messages (10 started + 90 queued) get archived + +### 5. Visibility Timeout Verification (❌ Not Covered) +**Scenario**: Ensure orphaned messages don't reappear after timeout +``` +step1 → step2 → map_step +``` + +**Test Flow**: +1. Worker starts step2 (30s visibility timeout) +2. Type violation occurs but message NOT archived (simulate bug) +3. Wait 31 seconds +4. **Verify**: Message reappears in queue (demonstrates the bug) +5. Apply fix and verify message doesn't reappear + +### 6. Nested Map Chains (❌ Not Covered) +**Scenario**: Type violation in middle of map chain +``` +map1 (produces arrays) → map2 (expects arrays) → map3 +``` + +**Test Flow**: +1. map1 task completes with non-array (violates map2 expectation) +2. Other map1 tasks are in various states (started/queued) +3. **Verify**: All map1 tasks archived, map2 never starts + +### 7. Race During Archival (❌ Not Covered) +**Scenario**: Worker tries to complete task while archival is happening +``` +step1 → step2 → map_step +``` + +**Test Flow**: +1. Worker A detects type violation, begins archiving +2. Worker B tries to complete its task during archival +3. **Verify**: Worker B's completion is rejected (guard clause) +4. **Verify**: No duplicate archival attempts + +## Implementation Strategy + +### Test Utilities Needed + +1. **Multi-worker simulator**: +```sql +CREATE FUNCTION pgflow_tests.simulate_worker( + worker_id uuid, + flow_slug text +) RETURNS TABLE(...); +``` + +2. **Queue state inspector**: +```sql +CREATE FUNCTION pgflow_tests.inspect_queue_state( + flow_slug text +) RETURNS TABLE( + message_id bigint, + task_status text, + visibility_timeout timestamptz +); +``` + +3. **Time manipulation** (for visibility timeout tests): +```sql +-- May need to mock pgmq visibility behavior +``` + +### Test File Organization + +``` +supabase/tests/type_violations/ +├── basic_violation.test.sql # Existing coverage +├── concurrent_started_tasks.test.sql # NEW: Scenario 2 +├── mixed_queue_states.test.sql # NEW: Scenario 3 +├── map_partial_processing.test.sql # NEW: Scenario 4 +├── visibility_timeout_recovery.test.sql # NEW: Scenario 5 +├── nested_map_chains.test.sql # NEW: Scenario 6 +└── race_during_archival.test.sql # NEW: Scenario 7 +``` + +## Success Criteria + +1. **No orphaned messages**: Queue must be empty after type violation +2. **No message resurrection**: Archived messages don't reappear after timeout +3. **Complete cleanup**: ALL tasks (queued + started) for the run are handled +4. **Atomic operation**: Archival happens in single transaction +5. **Guard effectiveness**: No operations on failed runs + +## Performance Considerations + +- Test with large numbers of tasks (1000+) to verify batch archival performance +- Ensure archival doesn't lock tables for extended periods +- Verify index usage on `(run_id, status, message_id)` + +## Current Gap Analysis + +**What we have**: +- Basic type violation detection ✅ +- Single task archival ✅ +- Run failure on violation ✅ + +**What we need**: +- True concurrent worker simulation ❌ +- Multi-task race condition validation ❌ +- Visibility timeout verification ❌ +- Performance under load testing ❌ + +## Priority + +1. **HIGH**: Concurrent started tasks (Scenario 2) - Most common real-world case +2. **HIGH**: Map partial processing (Scenario 4) - Critical for large arrays +3. **MEDIUM**: Mixed queue states (Scenario 3) - Complex flows +4. **LOW**: Other scenarios - Edge cases but important for robustness \ No newline at end of file diff --git a/pkgs/core/README.md b/pkgs/core/README.md index 8cc4ad7d8..85c89a8fa 100644 --- a/pkgs/core/README.md +++ b/pkgs/core/README.md @@ -231,10 +231,16 @@ The system handles failures by: - Preventing processing until the visibility timeout expires 3. When retries are exhausted: - Marking the task as 'failed' + - Storing the task output (even for failed tasks) - Marking the step as 'failed' - Marking the run as 'failed' - Archiving the message in PGMQ - - Notifying workers to abort pending tasks (future feature) + - **Archiving all queued messages for the failed run** (preventing orphaned messages) +4. Additional failure handling: + - **No retries on already-failed runs** - tasks are immediately marked as failed + - **Graceful type constraint violations** - handled without exceptions when single steps feed map steps + - **Stores invalid output on type violations** - captures the output that caused the violation for debugging + - **Performance-optimized message archiving** using indexed queries #### Retries and Timeouts diff --git a/pkgs/core/schemas/0060_tables_runtime.sql b/pkgs/core/schemas/0060_tables_runtime.sql index 0ce5ff620..7ad33921c 100644 --- a/pkgs/core/schemas/0060_tables_runtime.sql +++ b/pkgs/core/schemas/0060_tables_runtime.sql @@ -82,7 +82,7 @@ create table pgflow.step_tasks ( status in ('queued', 'started', 'completed', 'failed') ), constraint output_valid_only_for_completed check ( - output is null or status = 'completed' + output is null or status in ('completed', 'failed') ), constraint attempts_count_nonnegative check (attempts_count >= 0), constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)), diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index fc41fdd18..34f0fdc02 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -12,13 +12,35 @@ as $$ declare v_step_state pgflow.step_states%ROWTYPE; v_dependent_map_slug text; + v_run_record pgflow.runs%ROWTYPE; + v_step_record pgflow.step_states%ROWTYPE; begin -- ========================================== --- VALIDATION: Array output for dependent maps +-- GUARD: No mutations on failed runs -- ========================================== --- Must happen BEFORE acquiring locks to fail fast without holding resources --- Only validate for single steps - map steps produce scalars that get aggregated +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + +-- ========================================== +-- LOCK ACQUISITION AND TYPE VALIDATION +-- ========================================== +-- Acquire locks first to prevent race conditions +SELECT * INTO v_run_record FROM pgflow.runs +WHERE pgflow.runs.run_id = complete_task.run_id +FOR UPDATE; + +SELECT * INTO v_step_record FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug +FOR UPDATE; + +-- Check for type violations AFTER acquiring locks SELECT child_step.step_slug INTO v_dependent_map_slug FROM pgflow.deps dependency JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug @@ -28,7 +50,7 @@ JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug AND child_state.step_slug = child_step.step_slug WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step - AND dependency.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND dependency.flow_slug = v_run_record.flow_slug AND parent_step.step_type = 'single' -- Only validate single steps AND child_step.step_type = 'map' AND child_state.run_id = complete_task.run_id @@ -36,31 +58,69 @@ WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') LIMIT 1; +-- Handle type violation if detected IF v_dependent_map_slug IS NOT NULL THEN - RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)', - v_dependent_map_slug, - complete_task.step_slug, - CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END, - complete_task.output; + -- Mark run as failed immediately + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = complete_task.run_id; + + -- Archive all active messages (both queued and started) to prevent orphaned messages + PERFORM pgmq.archive( + v_run_record.flow_slug, + array_agg(st.message_id) + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING count(*) > 0; -- Only call archive if there are messages to archive + + -- Mark current task as failed and store the output + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + output = complete_task.output, -- Store the output that caused the violation + error_message = '[TYPE_VIOLATION] Produced ' || + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END || + ' instead of array' + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + + -- Mark step state as failed + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug; + + -- Archive the current task's message (it was started, now failed) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id -- Single message, use scalar form + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + + -- Return empty result + RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + RETURN; END IF; -- ========================================== -- MAIN CTE CHAIN: Update task and propagate changes -- ========================================== WITH --- ---------- Lock acquisition ---------- --- Acquire locks in consistent order (run -> step) to prevent deadlocks -run_lock AS ( - SELECT * FROM pgflow.runs - WHERE pgflow.runs.run_id = complete_task.run_id - FOR UPDATE -), -step_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - FOR UPDATE -), -- ---------- Task completion ---------- -- Update the task record with completion status and output task AS ( diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index a3f328ab7..cab0cd846 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -14,6 +14,35 @@ DECLARE v_step_failed boolean; begin +-- If run is already failed, no retries allowed +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + error_message = fail_task.error_message + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index + AND pgflow.step_tasks.status = 'started'; + + -- Archive the task's message + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; +END IF; + WITH run_lock AS ( SELECT * FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id @@ -140,6 +169,18 @@ IF v_run_failed THEN END; END IF; +-- Archive all active messages (both queued and started) when run fails +IF v_run_failed THEN + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; +END IF; + -- For queued tasks: delay the message for retry with exponential backoff PERFORM ( WITH retry_config AS ( @@ -169,20 +210,16 @@ PERFORM ( ); -- For failed tasks: archive the message -PERFORM ( - WITH failed_tasks AS ( - SELECT r.flow_slug, st.message_id - FROM pgflow.step_tasks st - JOIN pgflow.runs r ON st.run_id = r.run_id - WHERE st.run_id = fail_task.run_id - AND st.step_slug = fail_task.step_slug - AND st.task_index = fail_task.task_index - AND st.status = 'failed' - ) - SELECT pgmq.archive(ft.flow_slug, ft.message_id) - FROM failed_tasks ft - WHERE EXISTS (SELECT 1 FROM failed_tasks) -); +PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) +FROM pgflow.step_tasks st +JOIN pgflow.runs r ON st.run_id = r.run_id +WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'failed' + AND st.message_id IS NOT NULL +GROUP BY r.flow_slug +HAVING COUNT(st.message_id) > 0; return query select * from pgflow.step_tasks st diff --git a/pkgs/core/schemas/0100_function_start_ready_steps.sql b/pkgs/core/schemas/0100_function_start_ready_steps.sql index 0952671c2..3fd29ca1d 100644 --- a/pkgs/core/schemas/0100_function_start_ready_steps.sql +++ b/pkgs/core/schemas/0100_function_start_ready_steps.sql @@ -1,8 +1,16 @@ create or replace function pgflow.start_ready_steps(run_id uuid) returns void -language sql +language plpgsql set search_path to '' as $$ +begin +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN + RETURN; +END IF; + -- ========================================== -- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) -- ========================================== @@ -165,4 +173,5 @@ SELECT sent_messages.msg_id FROM sent_messages; +end; $$; diff --git a/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql b/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql new file mode 100644 index 000000000..5675f4df1 --- /dev/null +++ b/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql @@ -0,0 +1,688 @@ +-- Modify "step_tasks" table +ALTER TABLE "pgflow"."step_tasks" DROP CONSTRAINT "output_valid_only_for_completed", ADD CONSTRAINT "output_valid_only_for_completed" CHECK ((output IS NULL) OR (status = ANY (ARRAY['completed'::text, 'failed'::text]))); +-- Modify "start_ready_steps" function +CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +begin +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN + RETURN; +END IF; + +-- ========================================== +-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) +-- ========================================== +-- These complete immediately without spawning tasks +WITH empty_map_steps AS ( + SELECT step_state.* + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.step_type = 'map' + AND step_state.initial_tasks = 0 + ORDER BY step_state.step_slug + FOR UPDATE OF step_state +), +-- ---------- Complete empty map steps ---------- +completed_empty_steps AS ( + UPDATE pgflow.step_states + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM empty_map_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = empty_map_steps.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Broadcast completion events ---------- +broadcast_empty_completed AS ( + SELECT + realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', completed_step.run_id, + 'step_slug', completed_step.step_slug, + 'status', 'completed', + 'started_at', completed_step.started_at, + 'completed_at', completed_step.completed_at, + 'remaining_tasks', 0, + 'remaining_deps', 0, + 'output', '[]'::jsonb + ), + concat('step:', completed_step.step_slug, ':completed'), + concat('pgflow:run:', completed_step.run_id), + false + ) + FROM completed_empty_steps AS completed_step +), + +-- ========================================== +-- HANDLE NORMAL STEPS (initial_tasks > 0) +-- ========================================== +-- ---------- Find ready steps ---------- +-- Steps with no remaining deps and known task count +ready_steps AS ( + SELECT * + FROM pgflow.step_states AS step_state + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count + AND step_state.initial_tasks > 0 -- Don't start taskless steps + -- Exclude empty map steps already handled + AND NOT EXISTS ( + SELECT 1 FROM empty_map_steps + WHERE empty_map_steps.run_id = step_state.run_id + AND empty_map_steps.step_slug = step_state.step_slug + ) + ORDER BY step_state.step_slug + FOR UPDATE +), +-- ---------- Mark steps as started ---------- +started_step_states AS ( + UPDATE pgflow.step_states + SET status = 'started', + started_at = now(), + remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting + FROM ready_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = ready_steps.step_slug + RETURNING pgflow.step_states.* +), + +-- ========================================== +-- TASK GENERATION AND QUEUE MESSAGES +-- ========================================== +-- ---------- Generate tasks and batch messages ---------- +-- Single steps: 1 task (index 0) +-- Map steps: N tasks (indices 0..N-1) +message_batches AS ( + SELECT + started_step.flow_slug, + started_step.run_id, + started_step.step_slug, + COALESCE(step.opt_start_delay, 0) as delay, + array_agg( + jsonb_build_object( + 'flow_slug', started_step.flow_slug, + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'task_index', task_idx.task_index + ) ORDER BY task_idx.task_index + ) AS messages, + array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices + FROM started_step_states AS started_step + JOIN pgflow.steps AS step + ON step.flow_slug = started_step.flow_slug + AND step.step_slug = started_step.step_slug + -- Generate task indices from 0 to initial_tasks-1 + CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) + GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay +), +-- ---------- Send messages to queue ---------- +-- Uses batch sending for performance with large arrays +sent_messages AS ( + SELECT + mb.flow_slug, + mb.run_id, + mb.step_slug, + task_indices.task_index, + msg_ids.msg_id + FROM message_batches mb + CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) + CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) + WHERE task_indices.idx_ord = msg_ids.msg_ord +), + +-- ---------- Broadcast step:started events ---------- +broadcast_events AS ( + SELECT + realtime.send( + jsonb_build_object( + 'event_type', 'step:started', + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'status', 'started', + 'started_at', started_step.started_at, + 'remaining_tasks', started_step.remaining_tasks, + 'remaining_deps', started_step.remaining_deps + ), + concat('step:', started_step.step_slug, ':started'), + concat('pgflow:run:', started_step.run_id), + false + ) + FROM started_step_states AS started_step +) + +-- ========================================== +-- RECORD TASKS IN DATABASE +-- ========================================== +INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) +SELECT + sent_messages.flow_slug, + sent_messages.run_id, + sent_messages.step_slug, + sent_messages.task_index, + sent_messages.msg_id +FROM sent_messages; + +end; +$$; +-- Modify "complete_task" function +CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_step_state pgflow.step_states%ROWTYPE; + v_dependent_map_slug text; + v_run_record pgflow.runs%ROWTYPE; + v_step_record pgflow.step_states%ROWTYPE; +begin + +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + +-- ========================================== +-- LOCK ACQUISITION AND TYPE VALIDATION +-- ========================================== +-- Acquire locks first to prevent race conditions +SELECT * INTO v_run_record FROM pgflow.runs +WHERE pgflow.runs.run_id = complete_task.run_id +FOR UPDATE; + +SELECT * INTO v_step_record FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug +FOR UPDATE; + +-- Check for type violations AFTER acquiring locks +SELECT child_step.step_slug INTO v_dependent_map_slug +FROM pgflow.deps dependency +JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug + AND child_step.step_slug = dependency.step_slug +JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug + AND parent_step.step_slug = dependency.dep_slug +JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug + AND child_state.step_slug = child_step.step_slug +WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step + AND dependency.flow_slug = v_run_record.flow_slug + AND parent_step.step_type = 'single' -- Only validate single steps + AND child_step.step_type = 'map' + AND child_state.run_id = complete_task.run_id + AND child_state.initial_tasks IS NULL + AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') +LIMIT 1; + +-- Handle type violation if detected +IF v_dependent_map_slug IS NOT NULL THEN + -- Mark run as failed immediately + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = complete_task.run_id; + + -- Archive all active messages (both queued and started) to prevent orphaned messages + PERFORM pgmq.archive( + v_run_record.flow_slug, + array_agg(st.message_id) + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING count(*) > 0; -- Only call archive if there are messages to archive + + -- Mark current task as failed and store the output + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + output = complete_task.output, -- Store the output that caused the violation + error_message = '[TYPE_VIOLATION] Produced ' || + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END || + ' instead of array' + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + + -- Mark step state as failed + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug; + + -- Archive the current task's message (it was started, now failed) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id -- Single message, use scalar form + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + + -- Return empty result + RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + RETURN; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Update task and propagate changes +-- ========================================== +WITH +-- ---------- Task completion ---------- +-- Update the task record with completion status and output +task AS ( + UPDATE pgflow.step_tasks + SET + status = 'completed', + completed_at = now(), + output = complete_task.output + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index + AND pgflow.step_tasks.status = 'started' + RETURNING * +), +-- ---------- Step state update ---------- +-- Decrement remaining_tasks and potentially mark step as completed +step_state AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement + ELSE 'started' + END, + completed_at = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement + ELSE NULL + END, + remaining_tasks = pgflow.step_states.remaining_tasks - 1 + FROM task + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Dependency resolution ---------- +-- Find all child steps that depend on the completed parent step (only if parent completed) +child_steps AS ( + SELECT deps.step_slug AS child_step_slug + FROM pgflow.deps deps + JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug + WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child + ORDER BY deps.step_slug -- Ensure consistent ordering +), +-- ---------- Lock child steps ---------- +-- Acquire locks on all child steps before updating them +child_steps_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps) + FOR UPDATE +), +-- ---------- Update child steps ---------- +-- Decrement remaining_deps and resolve NULL initial_tasks for map steps +child_steps_update AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- Resolve NULL initial_tasks for child map steps + -- This is where child maps learn their array size from the parent + -- This CTE only runs when the parent step is complete (see child_steps JOIN) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN + CASE + WHEN parent_step.step_type = 'map' THEN + -- Map->map: Count all completed tasks from parent map + -- We add 1 because the current task is being completed in this transaction + -- but isn't yet visible as 'completed' in the step_tasks table + -- TODO: Refactor to use future column step_states.total_tasks + -- Would eliminate the COUNT query and just use parent_state.total_tasks + (SELECT COUNT(*)::int + 1 + FROM pgflow.step_tasks parent_tasks + WHERE parent_tasks.run_id = complete_task.run_id + AND parent_tasks.step_slug = complete_task.step_slug + AND parent_tasks.status = 'completed' + AND parent_tasks.task_index != complete_task.task_index) + ELSE + -- Single->map: Use output array length (single steps complete immediately) + CASE + WHEN complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE NULL -- Keep NULL if not an array + END + END + ELSE child_state.initial_tasks -- Keep existing value (including NULL) + END + FROM child_steps children + JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND child_step.step_slug = children.child_step_slug + JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_slug = complete_task.step_slug + WHERE child_state.run_id = complete_task.run_id + AND child_state.step_slug = children.child_step_slug +) +-- ---------- Update run remaining_steps ---------- +-- Decrement the run's remaining_steps counter if step completed +UPDATE pgflow.runs +SET remaining_steps = pgflow.runs.remaining_steps - 1 +FROM step_state +WHERE pgflow.runs.run_id = complete_task.run_id + AND step_state.status = 'completed'; + +-- ========================================== +-- POST-COMPLETION ACTIONS +-- ========================================== + +-- ---------- Get updated state for broadcasting ---------- +SELECT * INTO v_step_state FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + +-- ---------- Handle step completion ---------- +IF v_step_state.status = 'completed' THEN + -- Cascade complete any taskless steps that are now ready + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); + + -- Broadcast step:completed event + -- For map steps, aggregate all task outputs; for single steps, use the task output + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'completed', + 'output', CASE + WHEN (SELECT s.step_type FROM pgflow.steps s + WHERE s.flow_slug = v_step_state.flow_slug + AND s.step_slug = complete_task.step_slug) = 'map' THEN + -- Aggregate all task outputs for map steps + (SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.status = 'completed') + ELSE + -- Single step: use the individual task output + complete_task.output + END, + 'completed_at', v_step_state.completed_at + ), + concat('step:', complete_task.step_slug, ':completed'), + concat('pgflow:run:', complete_task.run_id), + false + ); +END IF; + +-- ---------- Archive completed task message ---------- +-- Move message from active queue to archive table +PERFORM ( + WITH completed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.status = 'completed' + ) + SELECT pgmq.archive(ct.flow_slug, ct.message_id) + FROM completed_tasks ct + WHERE EXISTS (SELECT 1 FROM completed_tasks) +); + +-- ---------- Trigger next steps ---------- +-- Start any steps that are now ready (deps satisfied) +PERFORM pgflow.start_ready_steps(complete_task.run_id); + +-- Check if the entire run is complete +PERFORM pgflow.maybe_complete_run(complete_task.run_id); + +-- ---------- Return completed task ---------- +RETURN QUERY SELECT * +FROM pgflow.step_tasks AS step_task +WHERE step_task.run_id = complete_task.run_id + AND step_task.step_slug = complete_task.step_slug + AND step_task.task_index = complete_task.task_index; + +end; +$$; +-- Modify "fail_task" function +CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_run_failed boolean; + v_step_failed boolean; +begin + +-- If run is already failed, no retries allowed +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + error_message = fail_task.error_message + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index + AND pgflow.step_tasks.status = 'started'; + + -- Archive the task's message + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; +END IF; + +WITH run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = fail_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + FOR UPDATE +), +flow_info AS ( + SELECT r.flow_slug + FROM pgflow.runs r + WHERE r.run_id = fail_task.run_id +), +config AS ( + SELECT + COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts, + COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN flow_info fi ON fi.flow_slug = s.flow_slug + WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug +), +fail_or_retry_task as ( + UPDATE pgflow.step_tasks as task + SET + status = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued' + ELSE 'failed' + END, + failed_at = CASE + WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now() + ELSE NULL + END, + started_at = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL + ELSE task.started_at + END, + error_message = fail_task.error_message + WHERE task.run_id = fail_task.run_id + AND task.step_slug = fail_task.step_slug + AND task.task_index = fail_task.task_index + AND task.status = 'started' + RETURNING * +), +maybe_fail_step AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed' + ELSE pgflow.step_states.status + END, + failed_at = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now() + ELSE NULL + END, + error_message = CASE + WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message + ELSE NULL + END + FROM fail_or_retry_task + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + RETURNING pgflow.step_states.* +) +-- Update run status +UPDATE pgflow.runs +SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END +WHERE pgflow.runs.run_id = fail_task.run_id +RETURNING (status = 'failed') INTO v_run_failed; + +-- Check if step failed by querying the step_states table +SELECT (status = 'failed') INTO v_step_failed +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Send broadcast event for step failure if the step was failed +IF v_step_failed THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + concat('step:', fail_task.step_slug, ':failed'), + concat('pgflow:run:', fail_task.run_id), + false + ); +END IF; + +-- Send broadcast event for run failure if the run was failed +IF v_run_failed THEN + DECLARE + v_flow_slug text; + BEGIN + SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id; + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', fail_task.run_id, + 'flow_slug', v_flow_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', fail_task.run_id), + false + ); + END; +END IF; + +-- Archive all active messages (both queued and started) when run fails +IF v_run_failed THEN + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; +END IF; + +-- For queued tasks: delay the message for retry with exponential backoff +PERFORM ( + WITH retry_config AS ( + SELECT + COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN pgflow.runs r ON r.flow_slug = f.flow_slug + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug + ), + queued_tasks AS ( + SELECT + r.flow_slug, + st.message_id, + pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'queued' + ) + SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay) + FROM queued_tasks qt + WHERE EXISTS (SELECT 1 FROM queued_tasks) +); + +-- For failed tasks: archive the message +PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) +FROM pgflow.step_tasks st +JOIN pgflow.runs r ON st.run_id = r.run_id +WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'failed' + AND st.message_id IS NOT NULL +GROUP BY r.flow_slug +HAVING COUNT(st.message_id) > 0; + +return query select * +from pgflow.step_tasks st +where st.run_id = fail_task.run_id + and st.step_slug = fail_task.step_slug + and st.task_index = fail_task.task_index; + +end; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 7f21a5295..229f7ae81 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:L0B5nrLTUufGJ0mMcVcnPjHjuvhPMDMHvOv4rgEnycI= +h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= @@ -15,3 +15,4 @@ h1:L0B5nrLTUufGJ0mMcVcnPjHjuvhPMDMHvOv4rgEnycI= 20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y= 20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4= 20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60= +20250919101802_pgflow_temp_orphaned_messages_index.sql h1:GyfPfQz4AqB1/sTAC7B/m6j8FJrpkocinnzerNfM0f8= diff --git a/pkgs/core/supabase/tests/cascade_complete_taskless_steps/no_cascade_on_failed_run.test.sql b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/no_cascade_on_failed_run.test.sql new file mode 100644 index 000000000..683f0f0d6 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_complete_taskless_steps/no_cascade_on_failed_run.test.sql @@ -0,0 +1,44 @@ +-- Test that cascade_complete_taskless_steps returns 0 and does not cascade when run is failed +begin; + +select plan(2); + +-- Create test flow with cascade of taskless steps +select pgflow.create_flow('test_flow', max_attempts => 1); +select pgflow.add_step('test_flow', 'step1'); +select pgflow.add_step('test_flow', 'step2', deps_slugs => ARRAY['step1']); + +-- Start a flow run +select pgflow.start_flow('test_flow', '{"test": "data"}'::jsonb); + +-- Get the run_id +select run_id from pgflow.runs where flow_slug = 'test_flow' limit 1 \gset + +-- Get message ID for step1 +select message_id as msg1 from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' limit 1 \gset + +-- Ensure worker exists +select pgflow_tests.ensure_worker('test_flow'); + +-- Start and fail step1 which will fail the entire run +select pgflow.start_tasks('test_flow', ARRAY[:msg1]::bigint[], '11111111-1111-1111-1111-111111111111'::uuid); +select pgflow.fail_task(:'run_id', 'step1', 0, 'Simulated failure'); + +-- Call cascade_complete_taskless_steps directly on the failed run +-- It should return 0 without doing any cascading +select is( + pgflow.cascade_complete_taskless_steps(:'run_id'), + 0, + 'cascade_complete_taskless_steps should return 0 when run is failed' +); + +-- Verify run remains failed +select is( + status, + 'failed', + 'Run should remain in failed status' +) from pgflow.runs where run_id = :'run_id'; + +select finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/complete_task/no_mutations_on_failed_run.test.sql b/pkgs/core/supabase/tests/complete_task/no_mutations_on_failed_run.test.sql new file mode 100644 index 000000000..f380602d8 --- /dev/null +++ b/pkgs/core/supabase/tests/complete_task/no_mutations_on_failed_run.test.sql @@ -0,0 +1,85 @@ +-- Test that complete_task does not mutate state when run is already failed +-- This simulates a race condition where a task tries to complete after the run has failed +begin; + +select plan(6); + +-- Create test flow with two parallel steps (one will fail, one will try to complete) +select pgflow.create_flow('test_flow', max_attempts => 1); +select pgflow.add_step('test_flow', 'step1'); +select pgflow.add_step('test_flow', 'step2'); + +-- Start a flow run +select pgflow.start_flow('test_flow', '{"test": "data"}'::jsonb); + +-- Get the run_id +select run_id from pgflow.runs where flow_slug = 'test_flow' limit 1 \gset + +-- Get message IDs for both steps +select message_id as msg1 from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' limit 1 \gset + +select message_id as msg2 from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step2' limit 1 \gset + +-- Ensure worker exists +select pgflow_tests.ensure_worker('test_flow'); + +-- Start both tasks (simulating workers picking them up) +select pgflow.start_tasks('test_flow', ARRAY[:msg1, :msg2]::bigint[], '11111111-1111-1111-1111-111111111111'::uuid); + +-- Fail step2 which will fail the entire run (max_attempts=1) +select pgflow.fail_task(:'run_id', 'step2', 0, 'Simulated failure'); + +-- Now try to complete step1 (race condition - worker doesn't know run failed) +select pgflow.complete_task(:'run_id', 'step1', 0, '{"result": "test"}'::jsonb); + +-- Verify task was NOT marked as completed +select is( + status, + 'started', + 'Task should remain in started status when run is failed' +) from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' and task_index = 0; + +-- Verify output was NOT saved +select is( + output, + null, + 'Task output should not be saved when run is failed' +) from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' and task_index = 0; + +-- Verify step state was NOT changed to completed +select is( + status, + 'started', + 'Step should remain in started status when run is failed' +) from pgflow.step_states +where run_id = :'run_id' and step_slug = 'step1'; + +-- Verify step2 is failed +select is( + status, + 'failed', + 'Step2 should be in failed status' +) from pgflow.step_states +where run_id = :'run_id' and step_slug = 'step2'; + +-- Verify run is failed +select is( + status, + 'failed', + 'Run should be in failed status' +) from pgflow.runs where run_id = :'run_id'; + +-- Verify step1 remains started (not completed) +select is( + attempts_count, + 1, + 'Step1 attempts_count should be 1 after start_tasks' +) from pgflow.step_tasks +where run_id = :'run_id' and step_slug = 'step1' and task_index = 0; + +select finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/fail_task/archive_sibling_map_tasks.test.sql b/pkgs/core/supabase/tests/fail_task/archive_sibling_map_tasks.test.sql new file mode 100644 index 000000000..660c0465c --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task/archive_sibling_map_tasks.test.sql @@ -0,0 +1,113 @@ +begin; +select plan(8); +select pgflow_tests.reset_db(); + +-- Test: fail_task should archive all sibling task messages for map steps + +-- Create flow with a map step +select pgflow.create_flow('test_map_fail'); +select pgflow.add_step( + flow_slug => 'test_map_fail', + step_slug => 'map_step', + step_type => 'map', + max_attempts => 1 +); + +-- Start flow with 5 array elements +select run_id as test_run_id from pgflow.start_flow('test_map_fail', '["a", "b", "c", "d", "e"]'::jsonb) \gset + +-- Verify all 5 messages are in queue +select is( + (select count(*) from pgmq.q_test_map_fail), + 5::bigint, + 'Should have 5 messages in queue for 5 map tasks' +); + +-- Verify all 5 tasks are created +select is( + (select count(*)::integer from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and status = 'queued'), + 5, + 'Should have 5 queued tasks' +); + +-- Ensure worker exists for polling +select pgflow_tests.ensure_worker('test_map_fail'); + +-- Start task 0 (simulating Edge Worker behavior) +-- Note: read_and_start will start one of the tasks (we'll use it for testing) +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('test_map_fail', 1, 1) + LIMIT 1 +) +SELECT step_slug FROM task; + +-- Get the actual task_index of the started task for later reference +select task_index as started_task_index from pgflow.step_tasks +where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and status = 'started' \gset + +-- Fail the started task +select pgflow.fail_task( + :'test_run_id'::uuid, + 'map_step', + :'started_task_index'::integer, + 'Task failed!' +); + +-- Test: Run should be marked as failed +select is( + (select status from pgflow.runs where run_id = :'test_run_id'::uuid), + 'failed', + 'Run should be marked as failed after task failure' +); + +-- Test: Failed task should have status 'failed' +select is( + (select status from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and task_index = :'started_task_index'::integer), + 'failed', + 'Started task should be marked as failed' +); + +-- CRITICAL TEST: All sibling task messages should be archived (removed from queue) +select is( + (select count(*) from pgmq.q_test_map_fail), + 0::bigint, + 'All 5 messages should be archived (removed from queue) when one map task fails' +); + +-- Test: Verify messages were actually archived, not deleted +select is( + (select count(*) from pgmq.a_test_map_fail), + 5::bigint, + 'All 5 messages should be in archive table' +); + +-- Test: All sibling tasks should remain in 'queued' status +select is( + (select count(*)::integer from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step' + and task_index != :'started_task_index'::integer + and status = 'queued'), + 4, + 'Sibling tasks should remain in queued status' +); + +-- Test: Step state should be marked as failed +select is( + (select status from pgflow.step_states + where run_id = :'test_run_id'::uuid + and step_slug = 'map_step'), + 'failed', + 'Map step should be marked as failed' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql new file mode 100644 index 000000000..c8f45701f --- /dev/null +++ b/pkgs/core/supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql @@ -0,0 +1,124 @@ +begin; +select plan(8); +select pgflow_tests.reset_db(); + +-- Test: Type constraint violation should archive all pending messages for the run + +-- Create flow with single -> map dependency where map has multiple downstream steps +select pgflow.create_flow('type_constraint_test'); + +select pgflow.add_step( + flow_slug => 'type_constraint_test', + step_slug => 'producer', + step_type => 'single' +); + +select pgflow.add_step( + flow_slug => 'type_constraint_test', + step_slug => 'map_consumer', + deps_slugs => ARRAY['producer'], + step_type => 'map' +); + +-- Add a parallel single step to verify all pending messages are archived +select pgflow.add_step( + flow_slug => 'type_constraint_test', + step_slug => 'parallel_single', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); + +select run_id as test_run_id from pgflow.start_flow( + 'type_constraint_test', + '{}'::jsonb +) \gset + +-- Verify initial state: 1 message in queue (only root producer starts immediately) +select is( + (select count(*) from pgmq.q_type_constraint_test), + 1::bigint, + 'Should have 1 message in queue initially (only producer as root step)' +); + +-- Test: Map starts with NULL initial_tasks +select is( + (select initial_tasks from pgflow.step_states + where run_id = :'test_run_id'::uuid + and step_slug = 'map_consumer'), + NULL::integer, + 'Dependent map should start with NULL initial_tasks' +); + +-- Ensure worker exists for polling +select pgflow_tests.ensure_worker('type_constraint_test'); + +-- Start the producer task (simulating Edge Worker behavior) +-- Since producer is the only root step, it will be the one started +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('type_constraint_test', 1, 1) + LIMIT 1 +) +SELECT step_slug FROM task; + +-- Get the producer task message_id for verification +select message_id as producer_msg_id from pgflow.step_tasks +where run_id = :'test_run_id'::uuid + and step_slug = 'producer' \gset + +-- Test: complete_task should handle type violation gracefully (no exception) +select lives_ok( + format($$ + SELECT pgflow.complete_task( + '%s'::uuid, + 'producer', + 0, + '{"not": "an array"}'::jsonb + ) + $$, :'test_run_id'), + 'complete_task should handle type violation gracefully without throwing exception' +); + +-- CRITICAL TEST: Queue should be empty after type constraint violation (if archiving is implemented) +-- Currently this will fail because archiving is not implemented +select is( + (select count(*) from pgmq.q_type_constraint_test), + 0::bigint, + 'Queue should be empty after type constraint violation (all pending messages archived)' +); + +-- Test: Verify if any messages were archived +-- This will also fail because archiving is not implemented +select ok( + (select count(*) from pgmq.a_type_constraint_test) > 0, + 'Some messages should be in archive table after type constraint violation' +); + +-- Test: Run status after type constraint violation +-- Currently the run won't be marked as failed (transaction rolled back) +select is( + (select status from pgflow.runs where run_id = :'test_run_id'::uuid), + 'failed', + 'Run should be marked as failed after type constraint violation' +); + +-- Test: Map initial_tasks should remain NULL after failed transaction +select is( + (select initial_tasks from pgflow.step_states + where run_id = :'test_run_id'::uuid + and step_slug = 'map_consumer'), + NULL, + 'Map initial_tasks should remain NULL after type constraint violation' +); + +-- Test: Check if parallel_single task exists +-- After the transaction rollback, parallel_single won't be created +select is( + (select count(*)::integer from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and step_slug = 'parallel_single'), + 0, + 'Parallel single task should not exist after type constraint violation (transaction rolled back)' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql index 954a94d74..ac18b076e 100644 --- a/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql +++ b/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql @@ -1,5 +1,5 @@ begin; -select plan(3); +select plan(8); select pgflow_tests.reset_db(); -- Test: Non-array output to dependent map should fail the run @@ -42,8 +42,8 @@ WITH task AS ( ) SELECT step_slug FROM task; --- Test: complete_task should RAISE EXCEPTION for non-array output to map -select throws_ok( +-- Test: complete_task should handle type violation gracefully (no exception) +select lives_ok( $$ WITH task_info AS ( SELECT run_id, step_slug, task_index @@ -59,9 +59,47 @@ select throws_ok( '{"not": "an array"}'::jsonb ) FROM task_info $$, - 'P0001', -- RAISE EXCEPTION error code - 'Map step map_consumer expects array input but dependency producer produced object (output: {"not": "an array"})', - 'complete_task should fail when non-array is passed to dependent map' + 'complete_task should handle type violation gracefully without throwing exception' +); + +-- Test: Producer task should be marked as failed with error message +select is( + (select status from pgflow.step_tasks + where flow_slug = 'non_array_test' and step_slug = 'producer'), + 'failed'::text, + 'Producer task should be marked as failed' +); + +-- Test: Producer task should have appropriate error message +select ok( + (select error_message ILIKE '%TYPE_VIOLATION%' from pgflow.step_tasks + where flow_slug = 'non_array_test' and step_slug = 'producer'), + 'Producer task should have type constraint error message' +); + +-- Test: Producer task should store the invalid output despite failing +select is( + (select output from pgflow.step_tasks + where flow_slug = 'non_array_test' and step_slug = 'producer'), + '{"not": "an array"}'::jsonb, + 'Producer task should store the output that caused the type violation' +); + +-- Test: Run should be marked as failed +select is( + (select status from pgflow.runs + where flow_slug = 'non_array_test' limit 1), + 'failed'::text, + 'Run should be marked as failed after type constraint violation' +); + +-- Test: Messages should be archived +select cmp_ok( + (select count(*) from pgmq.a_non_array_test + where message->>'step_slug' = 'producer'), + '>', + 0::bigint, + 'Failed task messages should be archived' ); -- Test: Map initial_tasks should remain NULL after failed transaction diff --git a/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql index d1c045265..ddc24993c 100644 --- a/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql +++ b/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql @@ -1,5 +1,5 @@ begin; -select plan(3); +select plan(7); select pgflow_tests.reset_db(); -- Test: NULL output to dependent map should fail @@ -42,8 +42,8 @@ WITH task AS ( ) SELECT step_slug FROM task; --- Test: complete_task should RAISE EXCEPTION for NULL output to map -select throws_ilike( +-- Test: complete_task should handle NULL output gracefully (no exception) +select lives_ok( $$ WITH task_info AS ( SELECT run_id, step_slug, task_index @@ -59,8 +59,38 @@ select throws_ilike( NULL::jsonb -- Passing literal NULL! ) FROM task_info $$, - '%Map step map_consumer expects array input but dependency producer produced null%', - 'complete_task should fail when NULL is passed to dependent map' + 'complete_task should handle NULL output gracefully without throwing exception' +); + +-- Test: Producer task should be marked as failed +select is( + (select status from pgflow.step_tasks + where flow_slug = 'null_output_test' and step_slug = 'producer'), + 'failed'::text, + 'Producer task should be marked as failed' +); + +-- Test: Producer task should have appropriate error message +select ok( + (select error_message ILIKE '%TYPE_VIOLATION%' from pgflow.step_tasks + where flow_slug = 'null_output_test' and step_slug = 'producer'), + 'Producer task should have type constraint error message' +); + +-- Test: Producer task should store NULL output +select is( + (select output from pgflow.step_tasks + where flow_slug = 'null_output_test' and step_slug = 'producer'), + NULL::jsonb, + 'Producer task should store the NULL output that caused the type violation' +); + +-- Test: Run should be marked as failed +select is( + (select status from pgflow.runs + where flow_slug = 'null_output_test' limit 1), + 'failed'::text, + 'Run should be marked as failed after type constraint violation' ); -- Test: Map initial_tasks should remain NULL after failed transaction diff --git a/pkgs/core/supabase/tests/type_violations/archive_all_messages.test.sql b/pkgs/core/supabase/tests/type_violations/archive_all_messages.test.sql new file mode 100644 index 000000000..28959b046 --- /dev/null +++ b/pkgs/core/supabase/tests/type_violations/archive_all_messages.test.sql @@ -0,0 +1,179 @@ +begin; +select plan(10); +select pgflow_tests.reset_db(); + +-- Test: Both queued AND started messages are archived on type violation +-- This prevents orphaned messages cycling through workers + +-- Create flow with parallel branches to have multiple tasks +select pgflow.create_flow('archive_test'); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'producer', + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'branch1', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'branch2', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'branch3', + deps_slugs => ARRAY['producer'], + step_type => 'single' +); +-- This map step expects arrays from branch1 +select pgflow.add_step( + flow_slug => 'archive_test', + step_slug => 'consumer_map', + deps_slugs => ARRAY['branch1'], + step_type => 'map' +); + +-- Start flow +select run_id as test_run_id from pgflow.start_flow('archive_test', '{}') \gset + +-- Start producer task +select pgflow_tests.ensure_worker('archive_test', '11111111-1111-1111-1111-111111111111'::uuid); +SELECT * FROM pgflow_tests.read_and_start('archive_test', 1, 1) LIMIT 1; + +-- Complete producer to spawn branches +SELECT pgflow.complete_task(:'test_run_id'::uuid, 'producer', 0, '{"data": "test"}'::jsonb); + +-- Start some branch tasks to have a mix of queued and started +select pgflow_tests.ensure_worker('archive_test', 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'::uuid, 'worker_a'); +SELECT * FROM pgflow_tests.read_and_start('archive_test', 1, 1, + 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'::uuid, 'worker_a') LIMIT 1; + +select pgflow_tests.ensure_worker('archive_test', 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'::uuid, 'worker_b'); +SELECT * FROM pgflow_tests.read_and_start('archive_test', 1, 1, + 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'::uuid, 'worker_b') LIMIT 1; + +-- Count messages in queue before violation +select count(*) as queue_before from pgmq.q_archive_test \gset +select count(*) as archive_before from pgmq.a_archive_test \gset + +-- Check we have both queued and started tasks with messages +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and status = 'queued' and message_id is not null), + true, + 'Should have queued tasks with messages before violation' +); + +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id'::uuid + and status = 'started' and message_id is not null), + true, + 'Should have started tasks with messages before violation' +); + +-- Trigger type violation by completing branch1 with non-array (consumer_map expects array) +select lives_ok( + format($$ + SELECT pgflow.complete_task('%s'::uuid, 'branch1', 0, '{"not": "an array"}'::jsonb) + $$, :'test_run_id'), + 'complete_task should handle type violation' +); + +-- CRITICAL: All messages should be archived (both from queued AND started tasks) +select is( + (select count(*) from pgmq.q_archive_test), + 0::bigint, + 'Queue should be empty - ALL messages archived (not just queued ones)' +); + +-- Verify messages were moved to archive, not lost +select ok( + (select count(*) from pgmq.a_archive_test) > :archive_before::bigint, + 'Messages should be in archive table' +); + +-- SECOND TEST: fail_task also archives all messages +-- Reset for second test +select pgflow_tests.reset_db(); + +-- Create flow with max_attempts=1 to fail immediately +select pgflow.create_flow('fail_test', max_attempts => 1); +select pgflow.add_step( + flow_slug => 'fail_test', + step_slug => 'task1', + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'fail_test', + step_slug => 'task2', + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'fail_test', + step_slug => 'task3', + step_type => 'single' +); + +-- Start flow +select run_id as test_run_id2 from pgflow.start_flow('fail_test', '{}') \gset + +-- Start some tasks to have mix of queued and started +select pgflow_tests.ensure_worker('fail_test', 'cccccccc-cccc-cccc-cccc-cccccccccccc'::uuid, 'worker_c'); +SELECT * FROM pgflow_tests.read_and_start('fail_test', 1, 1, + 'cccccccc-cccc-cccc-cccc-cccccccccccc'::uuid, 'worker_c') LIMIT 1; + +select pgflow_tests.ensure_worker('fail_test', 'dddddddd-dddd-dddd-dddd-dddddddddddd'::uuid, 'worker_d'); +SELECT * FROM pgflow_tests.read_and_start('fail_test', 1, 1, + 'dddddddd-dddd-dddd-dddd-dddddddddddd'::uuid, 'worker_d') LIMIT 1; + +-- Count messages before failure +select count(*) as queue_before2 from pgmq.q_fail_test \gset + +-- Check we have both queued and started +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id2'::uuid + and status = 'queued' and message_id is not null), + true, + 'fail_test should have queued tasks with messages' +); + +select is( + (select count(*)::int > 0 from pgflow.step_tasks + where run_id = :'test_run_id2'::uuid + and status = 'started' and message_id is not null), + true, + 'fail_test should have started tasks with messages' +); + +-- Fail one task (with max_attempts=1, it will fail the run) +select lives_ok( + format($$ + SELECT pgflow.fail_task('%s'::uuid, 'task1', 0, 'Test error') + $$, :'test_run_id2'), + 'fail_task should handle run failure' +); + +-- CRITICAL: All messages should be archived +select is( + (select count(*) from pgmq.q_fail_test), + 0::bigint, + 'fail_task: Queue should be empty - ALL messages archived' +); + +-- Verify run is failed +select is( + (select status from pgflow.runs where run_id = :'test_run_id2'::uuid), + 'failed', + 'fail_task: Run should be marked as failed' +); + +select * from finish(); +rollback; \ No newline at end of file