diff --git a/.changeset/tricky-sites-stare.md b/.changeset/add-array-method-to-dsl.md similarity index 100% rename from .changeset/tricky-sites-stare.md rename to .changeset/add-array-method-to-dsl.md diff --git a/.changeset/easy-bats-nail.md b/.changeset/add-map-method-to-dsl.md similarity index 98% rename from .changeset/easy-bats-nail.md rename to .changeset/add-map-method-to-dsl.md index 780ae13a5..f413e236f 100644 --- a/.changeset/easy-bats-nail.md +++ b/.changeset/add-map-method-to-dsl.md @@ -1,5 +1,5 @@ --- -'@pgflow/dsl': patch +'@pgflow/dsl': minor --- Add `.map()` method to Flow DSL for defining map-type steps diff --git a/.changeset/yummy-geckos-marry.md b/.changeset/add-map-step-type-infrastructure.md similarity index 98% rename from .changeset/yummy-geckos-marry.md rename to .changeset/add-map-step-type-infrastructure.md index 8e379bc57..c6551c221 100644 --- a/.changeset/yummy-geckos-marry.md +++ b/.changeset/add-map-step-type-infrastructure.md @@ -1,5 +1,5 @@ --- -'@pgflow/core': patch +'@pgflow/core': minor --- Add map step type infrastructure in SQL core diff --git a/.changeset/fix-compile-config-flag.md b/.changeset/fix-compile-config-flag.md index b91178b15..c8cab1259 100644 --- a/.changeset/fix-compile-config-flag.md +++ b/.changeset/fix-compile-config-flag.md @@ -1,5 +1,5 @@ --- -'@pgflow/cli': patch +'pgflow': patch --- Fix: Use --config instead of --import-map for Deno compilation diff --git a/.changeset/hungry-cloths-hunt.md b/.changeset/improve-failure-handling-prevent-orphaned-messages.md similarity index 100% rename from .changeset/hungry-cloths-hunt.md rename to .changeset/improve-failure-handling-prevent-orphaned-messages.md diff --git a/PLAN.md b/PLAN.md deleted file mode 100644 index 3043d701f..000000000 --- a/PLAN.md +++ /dev/null @@ -1,177 +0,0 @@ -# Map Infrastructure (SQL Core) - -**NOTE: This PLAN.md file should be removed in the final PR once all map infrastructure is complete.** - -### Features - -- ✅ **DONE**: Empty array maps (taskless) cascade and complete correctly -- ✅ **DONE**: Task spawning creates N tasks with correct indices -- ✅ **DONE**: Dependency count propagation for map steps -- ✅ **DONE**: Array element extraction - tasks receive individual array elements -- ✅ **DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents -- ✅ **DONE**: DSL support for `.map()` for defining map steps with compile-time duplicate detection -- ✅ **DONE**: Fix orphaned messages on run failure -- ⏳ **TODO**: Performance optimization with step_states.output column - -### Chores - -- ⏳ **TODO**: Integration tests for map steps -- ⏳ **TODO**: Update core README -- ⏳ **TODO**: Add docs page for array and map steps -- ⏳ **TODO**: Migration consolidation -- ⏳ **TODO**: Graphite stack merge - -## Implementation Status - -### Sequential Child PR Plan - -#### ✅ Completed PRs - -- [x] **PR #207: Add .array() to DSL** - `feature-map-and-array` - - - TypeScript DSL enhancement for array creation - - Foundation for map step functionality - -- [x] **PR #208: Foundation - Schema & add_step()** - `09-10-feat_add_map_step_type_in_sql` - - - Schema changes (initial_tasks, remaining_tasks, constraints) - - add_step() function with map step validation - - Basic tests for map step creation - -- [x] **PR #209: Root Map Support** - `09-11-root-map-support` - - - Enhanced start_flow() for root map validation and count setting - - Tests for root map scenarios - -- [x] **PR #210: Task Spawning** - `09-12-task-spawning` - - - Enhanced start_ready_steps() for N task generation - - Empty array auto-completion - - Tests for batch task creation - -- [x] **PR #211: Cascade Complete Taskless Steps** - `09-15-complete-cascade` - - - Extracted taskless completion from start_ready_steps() - - Added cascade_complete_taskless_steps() function with iteration safety - - Generic solution for all initial_tasks=0 steps - - Fixed flow_slug matching bug in dep_updates CTE - - All taskless cascade tests passing (7/7 test files) - -- [x] **PR #212: Dependent Map Count Propagation** - - - Enhanced complete_task() sets initial_tasks for dependent maps - - Array validation and count propagation working - - Cascade handles taskless dependent maps - -- [x] **PR #213: NULL for Unknown initial_tasks** - `09-16-make-initial-tasks-nullable` - - - Changed initial_tasks from "1 as placeholder" to NULL for dependent map steps - - Benefits: Semantic correctness (NULL = unknown, not "1 task") - - Implemented: Schema change to allow NULL, updated all SQL functions - - Added validation for non-array and NULL outputs to map steps - - Comprehensive tests for NULL behavior and error cases - -- [x] **PR #216: Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE) - - - Enhanced start_tasks() to distribute array elements to map tasks - - Each map task receives its specific array element based on task_index - - Handles both root maps (from run input) and dependent maps (from step outputs) - - Tests with actual array data processing - -- [x] **PR #217: Output Aggregation** - `09-17-add-map-step-output-aggregation` (THIS PR) - - - Inline aggregation implementation in complete_task, start_tasks, maybe_complete_run - - Full test coverage (17 tests) for all aggregation scenarios - - Handles NULL preservation, empty arrays, order preservation - - Validates non-array outputs to map steps fail correctly - - Fixed broadcast aggregation to send full array not individual task output - -- [x] **PR #218: DSL Support for .map() Step Type** - `09-18-add-map-support-to-dsl` ✅ COMPLETED - - - Added `.map()` method to Flow DSL for defining map steps - - Constraints: - - Locked to exactly one dependency (enforced at compile time) - - Dependency must return an array (type-checked) - - Syntax design: - - Dependent maps: `flow.map({ slug: 'stepName', array: 'arrayReturningStep' }, handler)` - - Root maps: Omit array property - - Return type always inferred as array - - Comprehensive tests: - - Runtime validation of array dependencies - - Type safety for input/output types - - Compile-time enforcement of single dependency rule - - Fixed complex TypeScript type inference issue with overloads - - Added compile-time duplicate slug detection across all DSL methods - - Fixed all linting errors (replaced `{}` with `Record`) - - Updated DSL README with .map() documentation - - Created detailed changeset - -- [x] **PR #219: Fix Orphaned Messages on Run Failure** - `09-18-fix-orphaned-messages-on-fail` ✅ COMPLETED - - - Archives all queued messages when run fails (prevents orphaned messages) - - Handles type constraint violations gracefully without exceptions - - Added guards to prevent any mutations on failed runs: - - complete_task returns unchanged - - start_ready_steps exits early - - cascade_complete_taskless_steps returns 0 - - Added performance index for efficient message archiving - - Tests unstashed and passing (archive_sibling_map_tasks, archive_messages_on_type_constraint_failure) - - Updated core README with failure handling mentions - - **Critical fix: prevents queue performance degradation in production** - -#### ❌ Remaining Work (Priority Order) - -- [ ] **Integration Tests** - - - End-to-end workflows with real array data - - Basic happy path coverage - - This should be minimal and added to the Edge Worker integration test suite for now - -- [ ] **Performance Optimization - step_states.output Column** - - - Migrate from inline aggregation to storing outputs in step_states - - See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md) - - Benefits: - - Eliminate redundant aggregation queries - - 30-70% performance improvement for map chains - - Cleaner architecture with single source of truth - - Implementation: - - Add output column to step_states table - - Update complete_task to populate output on completion - - Simplify consumers (start_tasks, maybe_complete_run, broadcasts) - - Update all aggregation tests (~17 files) - - **Note**: This is an optimization that should be done after core functionality is stable - -- [ ] **Update `pkgs/core/README.md`** - - - Add new section describing the step types - - Describe single step briefly, focus on describing map step type and how it differs - - Make sure to mention that maps are constrained to have exactly one dependency - - Show multiple cases of inputs -> task creation - - Explain edge cases (empty array propagation, invalid array input) - - Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input - - Explain cascade completion of taskless steps and its limitations - -- [ ] **Add docs page** - - - put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx` - - describe the DSL and how the map works and why we need it - - show example usage of root map - - show example usage of dependent map - - focus mostly on how to use it, instead of how it works under the hood - - link to the README's for more details - -- [ ] **Migration Consolidation** - - - Remove all temporary/incremental migrations from feature branches - - Generate a single consolidated migration for the entire map infrastructure - - Ensure clean migration path from current production schema - - If NULL improvement is done, include it in the consolidated migration - -- [ ] **Graphite Stack Merge** - - - Configure Graphite merge queue for the complete PR stack - - Ensure all PRs in sequence can be merged together - - Final validation before merge to main - - Merge queue to be set such that it verifies only the top PR - (it is because of CI check for temp migrations) diff --git a/PLAN_partial_completion.md b/PLAN_partial_completion.md deleted file mode 100644 index b75be5a62..000000000 --- a/PLAN_partial_completion.md +++ /dev/null @@ -1,319 +0,0 @@ -# Partial Completion & Recovery Strategy for pgflow - -## Problem Statement - -When workflows fail mid-execution, especially after irreversible operations (API calls, database writes, external system mutations), we need a way to: -1. Preserve completed work -2. Allow recovery/continuation from failure point -3. Clean up orphaned resources (queued messages) -4. Provide clear failure semantics - -Currently, pgflow fails entire runs on any task failure, which: -- Wastes successfully completed work -- Leaves orphaned messages in queues -- Provides no recovery path -- Forces users to restart from scratch - -## How Other Systems Handle This - -### Temporal -**Approach**: Continue-As-New & Compensation -- **State Preservation**: Workflow state survives failures, can be resumed -- **Saga Pattern**: Explicit compensation activities to undo work -- **Activity Replay**: Deterministic replay skips completed activities -- **Non-Retryable Errors**: `ApplicationFailure` with `non_retryable=true` -- **Key Insight**: Distinguish between deterministic workflow errors (never retry) and transient activity errors (retry) - -### Inngest -**Approach**: Step-Level Isolation with Try-Catch -- **Step Independence**: Each step can fail/retry independently -- **Failure Handling**: `try/catch` blocks around steps -- **NonRetriableError**: Permanent failures that stop execution -- **Failure Handlers**: `onFailure` callbacks for cleanup/rollback -- **Key Insight**: Partial success is normal - handle errors with standard language primitives - -### Trigger.dev -**Approach**: Flexible Error Callbacks -- **handleError Callbacks**: Decide retry behavior per error type -- **Conditional Retries**: Skip retries based on error details -- **Response-Based Timing**: Retry after time from response headers -- **Key Insight**: Error handling logic lives with workflow definition - -### Apache Airflow -**Approach**: Clear & Resume -- **Task Clearing**: "Clear" failed tasks to retry from that point -- **Selective Retry**: Only retry failed tasks, not successful ones -- **Manual Intervention**: Operators can intervene to fix and resume -- **Backfill Operations**: Re-run specific date ranges or task sets -- **Key Insight**: Manual intervention is acceptable for complex failures - -### AWS Step Functions -**Approach**: Hybrid Retry + Redrive -- **Automatic Retries**: For transient errors -- **Manual Redrive**: Resume from specific states after fixing root cause -- **Selective Resume**: Choose which nodes to resume from -- **Key Insight**: Combine automatic and manual recovery strategies - -## Current pgflow Behavior - -### Type Contract Violations -```sql --- In complete_task: RAISES EXCEPTION, causes retry attempts -IF v_dependent_map_slug IS NOT NULL THEN - RAISE EXCEPTION 'Map step % expects array input...'; -END IF; -``` -**Problems**: -- Retries a deterministic error (won't fix itself) -- Wastes resources on pointless retries -- No cleanup of sibling/parallel tasks - -### Task Failures (fail_task) -```sql --- Archives only the failing task's message -SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id); --- Leaves all other queued messages orphaned! -``` -**Problems**: -- Orphaned messages waste worker resources -- Queue performance degrades over time -- No cleanup of sibling map tasks - -## Proposed Solution (MVP) - -### Phase 1: Immediate Message Cleanup (Current PR) - -#### 1. Enhanced fail_task - Archive All Pending Messages -```sql --- After marking run as failed, archive ALL pending messages -WITH tasks_to_archive AS ( - SELECT t.msg_id - FROM pgflow.step_tasks t - WHERE t.run_id = fail_task.run_id - AND t.status = 'pending' - AND t.msg_id IS NOT NULL -) -SELECT pgmq.archive('pgflow_tasks_queue', msg_id) -FROM tasks_to_archive; -``` - -#### 2. Type Contract Violation - Fail Fast Without Retries -```sql --- In complete_task validation block -IF v_dependent_map_slug IS NOT NULL THEN - -- Mark run as failed immediately (no retries) - UPDATE pgflow.runs - SET status = 'failed', - failed_at = now(), - error = format('Type contract violation: ...') - WHERE run_id = complete_task.run_id; - - -- Archive ALL pending messages immediately - PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id) - FROM pgflow.step_tasks t - WHERE t.run_id = complete_task.run_id - AND t.status = 'pending' - AND t.msg_id IS NOT NULL; - - -- Still raise for logging - RAISE EXCEPTION 'Type contract violation - run % failed', run_id; -END IF; -``` - -### Phase 2: Partial Success Status (Post-MVP) - -#### New Run Status -```sql -ALTER TYPE pgflow.run_status ADD VALUE 'partially_completed'; -``` - -#### Track Permanent vs Transient Failures -```sql -ALTER TYPE pgflow.task_status ADD VALUE 'failed_permanent'; - --- In complete_task for type violations -UPDATE pgflow.step_tasks -SET status = 'failed_permanent', - error = 'Type contract violation...' -WHERE ...; -``` - -#### Update Run Completion Logic -```sql --- In maybe_complete_run -UPDATE pgflow.runs -SET status = CASE - WHEN EXISTS (SELECT 1 FROM pgflow.step_tasks - WHERE run_id = ... - AND status IN ('failed', 'failed_permanent')) - THEN 'partially_completed' - ELSE 'completed' -END -``` - -### Phase 3: Clear & Resume Capability (Future) - -#### Clear Failed Tasks -```sql -CREATE FUNCTION pgflow.clear_task( - run_id uuid, - step_slug text, - task_index integer DEFAULT NULL -- NULL = clear all tasks for step -) RETURNS void AS $$ -BEGIN - -- Reset task(s) to pending - UPDATE pgflow.step_tasks - SET status = 'pending', - attempts_count = 0, - error_message = NULL, - started_at = NULL, - failed_at = NULL - WHERE pgflow.step_tasks.run_id = clear_task.run_id - AND pgflow.step_tasks.step_slug = clear_task.step_slug - AND (clear_task.task_index IS NULL OR pgflow.step_tasks.task_index = clear_task.task_index); - - -- Re-enqueue to pgmq - -- ... re-queue logic ... -END; -$$; -``` - -#### Resume Failed Run -```sql -CREATE FUNCTION pgflow.resume_run( - run_id uuid, - from_step text DEFAULT NULL -- Resume from specific step or all failed -) RETURNS void AS $$ -BEGIN - -- Reset run status - UPDATE pgflow.runs - SET status = 'started', - failed_at = NULL - WHERE pgflow.runs.run_id = resume_run.run_id; - - -- Clear failed steps - -- ... clear logic ... - - -- Restart flow processing - PERFORM pgflow.start_ready_steps(resume_run.run_id); -END; -$$; -``` - -## Implementation Priority - -### Must Have (Current PR) -1. ✅ Archive all messages when run fails -2. ✅ Handle map sibling tasks specially -3. ✅ Type violations fail immediately without retries - -### Should Have (Next PR) -1. Partial completion status -2. Distinguish permanent vs transient failures -3. Better error messages with recovery hints - -### Nice to Have (Future) -1. Clear/resume individual tasks -2. Compensation/rollback hooks -3. Checkpoint/savepoint system -4. Workflow versioning for hot-patches - -## Trade-offs & Decisions - -### Why Archive vs Delete Messages? -- **Archive**: Preserves audit trail, can analyze failures -- **Delete**: Saves space but loses debugging info -- **Decision**: Archive for now, add cleanup job later - -### Why Fail Entire Run vs Continue Parallel Branches? -- **Fail Run**: Simple, predictable, matches user expectations -- **Continue**: Complex state management, confusing semantics -- **Decision**: Fail run for MVP, consider partial continuation later - -### Why No Automatic Compensation? -- **Compensation**: Requires user-defined rollback logic -- **No Compensation**: Simpler, lets users handle externally -- **Decision**: No built-in compensation for MVP, document patterns - -## Testing Requirements - -### Failing Tests (Already Created) -1. `archive_sibling_map_tasks.test.sql` - Verify fail_task archives all map task messages -2. `archive_messages_on_type_constraint_failure.test.sql` - Verify type violations archive all pending messages - -### Additional Tests Needed -1. Performance impact of archiving many messages -2. Race conditions during failure/archive -3. Recovery after clear/resume -4. Partial completion status transitions - -## Migration Path - -### For Existing Users -1. Failure behavior changes are backward compatible -2. New statuses are additive (won't break existing code) -3. Clear/resume are opt-in features - -### Database Changes -```sql --- Add to migration -CREATE INDEX ON pgflow.step_tasks(run_id, status) -WHERE msg_id IS NOT NULL; -- Speed up message archiving - --- Future: Add partial_completed status --- Future: Add failed_permanent task status -``` - -## Documentation Requirements - -### User-Facing Docs -1. Explain failure modes and recovery options -2. Document clear/resume functions -3. Best practices for idempotent handlers -4. Patterns for compensation/rollback - -### Developer Notes -1. Why we archive vs delete messages -2. Performance implications of batch archiving -3. How to extend for custom failure handling - -## Success Metrics - -### Immediate (Current PR) -- [ ] No orphaned messages after failures -- [ ] Type violations don't waste retries -- [ ] Tests pass for message archiving - -### Long-term -- [ ] Users can recover from partial failures -- [ ] Queue performance doesn't degrade over time -- [ ] Clear error messages guide recovery -- [ ] Support for compensation patterns - -## References - -- Temporal Saga Pattern: https://docs.temporal.io/application-development/application-design-patterns#saga -- Inngest Error Handling: https://www.inngest.com/docs/guides/error-handling -- Airflow Task Clearing: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html -- AWS Step Functions Redrive: https://aws.amazon.com/blogs/compute/introducing-aws-step-functions-redrive-a-new-way-to-restart-workflows/ - -## Open Questions - -1. **Should we add a `retry_on_clear` flag to steps?** Some steps might not be safe to retry even manually. - -2. **How to handle time-sensitive operations?** If a step sends a "order shipped" email, clearing and retrying days later might be inappropriate. - -3. **Should partial_completed runs be reusable?** Or should resume create a new run linked to the original? - -4. **Message archiving performance?** At what scale does archiving 1000s of messages become problematic? - -5. **Compensation pattern?** Should pgflow provide first-class support for compensation/rollback handlers? - -## Decision Log - -| Date | Decision | Rationale | -|------|----------|-----------| -| 2024-01-18 | Archive messages, don't delete | Preserves debugging info, can add cleanup later | -| 2024-01-18 | Type violations fail immediately | Deterministic errors shouldn't retry | -| 2024-01-18 | No built-in compensation for MVP | Keep it simple, document patterns | -| 2024-01-18 | Fail entire run on task failure | Predictable behavior, simpler state management | \ No newline at end of file diff --git a/PLAN_step_output.md b/PLAN_step_output.md deleted file mode 100644 index cebaed580..000000000 --- a/PLAN_step_output.md +++ /dev/null @@ -1,371 +0,0 @@ -# Plan: Add `output` Column to `step_states` Table - -## Context & Timing - -**Status**: READY FOR FUTURE PR (not for current map aggregation PR) - -This plan was developed during the map output aggregation PR but should be implemented as a follow-up optimization after the map infrastructure stack is complete. The current PR uses inline aggregation which is sufficient for MVP. - -**Prerequisites**: -- Map infrastructure fully merged to main -- All map-related PRs complete (DSL, integration tests, migration consolidation) -- Current inline aggregation approach proven stable - -## Executive Summary - -This plan outlines the migration from inline output aggregation to storing step outputs directly in `pgflow.step_states`. This architectural change will improve performance, simplify code, and provide a cleaner data model. - -## Current State Analysis - -### Where Output Aggregation Happens Today - -1. **`pgflow.start_tasks`** (schemas/0120_function_start_tasks.sql:52-62) - - Aggregates map outputs when starting dependent steps - - Each dependent step re-aggregates outputs from scratch - -2. **`pgflow.maybe_complete_run`** (schemas/0100_function_maybe_complete_run.sql:36-45) - - Aggregates outputs for leaf steps when building run output - - Only runs at flow completion - -3. **`pgflow.complete_task`** (schemas/0100_function_complete_task.sql:184-197) - - Aggregates outputs for broadcast events - - Runs on every map step completion - -### Current Problems - -- **Performance**: Same aggregation query runs multiple times -- **Complexity**: Aggregation logic duplicated in 3 places -- **Map-to-Map Inefficiency**: Aggregate array just to immediately decompose it -- **Testing Burden**: Need to test aggregation in multiple contexts - -## Proposed Architecture - -### Schema Change - -```sql -ALTER TABLE pgflow.step_states -ADD COLUMN output jsonb; - --- Constraint: output only set when step is completed -ALTER TABLE pgflow.step_states -ADD CONSTRAINT output_only_when_completed CHECK ( - output IS NULL OR status = 'completed' -); -``` - -### Key Design Decisions - -1. **Single steps**: Store task output directly (no aggregation needed) -2. **Map steps**: Store aggregated array ordered by task_index -3. **Taskless steps**: Store empty array `[]` for map steps, NULL for single steps -4. **Storage location**: Step output belongs at step level, not task level -5. **Failed steps**: Do NOT store output at step level (only completed steps have step output) - - Individual failed tasks still store their output for debugging - - This includes type violation failures where task output is preserved - -## Implementation Changes - -### 1. Schema Updates - -**File**: New migration or update `0060_tables_runtime.sql` -- Add `output jsonb` column to `step_states` -- Add constraint `output_only_when_completed` - -### 2. Function Updates - -#### `pgflow.complete_task` - PRIMARY CHANGE -**File**: `schemas/0100_function_complete_task.sql` - -**Current Lines 86-104**: Update step state completion -```sql --- NEW: Set output when step completes -UPDATE pgflow.step_states -SET - status = 'completed', - completed_at = now(), - remaining_tasks = 0, - -- NEW: Store output at step level - output = CASE - WHEN (SELECT step_type FROM pgflow.steps - WHERE flow_slug = step_state.flow_slug - AND step_slug = complete_task.step_slug) = 'map' THEN - -- Aggregate all task outputs for map steps - (SELECT COALESCE(jsonb_agg(output ORDER BY task_index), '[]'::jsonb) - FROM pgflow.step_tasks - WHERE run_id = complete_task.run_id - AND step_slug = complete_task.step_slug - AND status = 'completed') - ELSE - -- Single step: use the task output directly - complete_task.output - END -WHERE ... -``` - -**Current Lines 184-197**: Simplify broadcast event -```sql --- SIMPLIFIED: Read from step_states.output -'output', v_step_state.output, -``` - -#### `pgflow.start_tasks` - SIMPLIFY -**File**: `schemas/0120_function_start_tasks.sql` - -**Current Lines 50-62**: Replace aggregation with simple read -```sql --- SIMPLIFIED: Read from step_states.output -dep_state.output as dep_output -... -FROM pgflow.step_states dep_state -WHERE dep_state.run_id = st.run_id - AND dep_state.step_slug = dep.dep_slug - AND dep_state.status = 'completed' -``` - -#### `pgflow.maybe_complete_run` - SIMPLIFY -**File**: `schemas/0100_function_maybe_complete_run.sql` - -**Current Lines 24-45**: Replace aggregation with simple read -```sql --- SIMPLIFIED: Read from step_states.output -SELECT jsonb_object_agg(step_slug, output) -FROM pgflow.step_states ss -WHERE ss.run_id = maybe_complete_run.run_id - AND NOT EXISTS ( - SELECT 1 FROM pgflow.deps d - WHERE d.flow_slug = ss.flow_slug - AND d.dep_slug = ss.step_slug - ) -``` - -#### `pgflow.cascade_complete_taskless_steps` - UPDATE -**File**: `schemas/0100_function_cascade_complete_taskless_steps.sql` - -**Current Lines 27-32**: Set output for taskless steps -```sql -UPDATE pgflow.step_states ss -SET status = 'completed', - started_at = now(), - completed_at = now(), - remaining_tasks = 0, - -- NEW: Set output for taskless steps - output = CASE - WHEN s.step_type = 'map' THEN '[]'::jsonb -- Empty array for map - ELSE NULL -- NULL for single steps - END -``` - -#### `pgflow.get_run_with_states` - ALREADY COMPATIBLE -**File**: `schemas/0105_function_get_run_with_states.sql` -- No changes needed - will automatically include output in response - -### 3. Test Updates - -#### Tests to Update (Add Assertions for `step_states.output`) - -1. **`tests/map_output_aggregation/basic_aggregation.test.sql`** - - Add: Verify `step_states.output` contains `[{output1}, {output2}, {output3}]` - -2. **`tests/map_output_aggregation/empty_map.test.sql`** - - Add: Verify `step_states.output = '[]'::jsonb` - -3. **`tests/map_output_aggregation/order_preservation.test.sql`** - - Add: Verify output array order matches task_index order - -4. **`tests/map_output_aggregation/map_to_single.test.sql`** - - Modify: Check dependent gets input from `step_states.output` - -5. **`tests/map_output_aggregation/map_to_map.test.sql`** - - Modify: Verify second map receives array from first map's `step_states.output` - -6. **`tests/map_output_aggregation/run_completion_leaf_map.test.sql`** - - Modify: Verify run output uses `step_states.output` - -7. **`tests/map_output_aggregation/null_outputs.test.sql`** - - Add: Verify NULL values preserved in `step_states.output` array - -8. **`tests/map_output_aggregation/mixed_dependencies.test.sql`** - - Add: Verify both map and single steps populate output correctly - -9. **`tests/map_output_aggregation/partial_completion_prevention.test.sql`** - - Add: Verify output remains NULL until all tasks complete - -10. **`tests/map_output_aggregation/failed_task_handling.test.sql`** - - Add: Verify step output remains NULL when step fails - - Add: Verify individual task outputs are still preserved for debugging - -11. **`tests/map_output_aggregation/map_initial_tasks_timing.test.sql`** - - No changes needed (focuses on timing) - -12. **`tests/map_output_aggregation/deep_map_chain.test.sql`** - - Modify: Verify each map in chain reads from previous `step_states.output` - -13. **`tests/map_output_aggregation/broadcast_aggregation.test.sql`** - - Modify: Verify broadcast uses `step_states.output` - -14. **`tests/map_output_aggregation/concurrent_completion.test.sql`** - - Add: Verify final `step_states.output` correct despite concurrency - -15. **`tests/map_output_aggregation/multiple_maps_to_single.test.sql`** - - Modify: Verify single step gets inputs from multiple `step_states.output` - -16. **`tests/complete_task/saves_output_when_completing_run.test.sql`** - - Modify: Verify run output built from `step_states.output` - -17. **`tests/completing_taskless_steps/*.sql` (7 files)** - - Add: Verify taskless maps have `output = '[]'` - - Add: Verify taskless single steps have `output = NULL` - -#### New Tests to Create - -1. **`tests/step_output/single_step_output.test.sql`** - ```sql - -- Verify single step stores task output directly in step_states.output - -- Complete a single task, check step_states.output = task.output - ``` - -2. **`tests/step_output/map_step_aggregation.test.sql`** - ```sql - -- Verify map step aggregates all task outputs in order - -- Complete N tasks, check step_states.output = array of N outputs - ``` - -3. **`tests/step_output/output_only_when_completed.test.sql`** - ```sql - -- Verify output is NULL for non-completed steps - -- Check constraint prevents setting output on non-completed steps - -- Verify failed steps have NULL output at step level - -- Verify failed tasks still preserve their output for debugging - ``` - -4. **`tests/step_output/taskless_step_outputs.test.sql`** - ```sql - -- Verify taskless map steps get '[]' as output - -- Verify taskless single steps get NULL as output - ``` - -5. **`tests/step_output/null_task_outputs.test.sql`** - ```sql - -- Verify NULL task outputs are preserved in aggregation - -- Map with [null, {data}, null] -> step_states.output has all three - ``` - -6. **`tests/step_output/failed_task_output_preservation.test.sql`** - ```sql - -- Verify failed tasks store their output (including type violations) - -- Verify step output remains NULL when step fails - -- Test both regular failures and type constraint violations - ``` - -#### Tests to Remove/Update - -1. **`tests/map_output_aggregation/broadcast_output_verification.test.sql`** - - **Action**: REMOVE - This test demonstrates a bug that becomes architecturally impossible - - **Reason**: With `step_states.output`, there's no confusion between task and aggregated output - -2. **`tests/map_output_aggregation/broadcast_event_bug.test.sql`** - - **Action**: KEEP AS-IS - Becomes regression test - - **Reason**: Will pass without modification, ensures broadcasts use aggregated output - - **Note**: No longer needs complex realtime.send mocking - just query realtime.messages - -### 4. Performance Tests - -**File**: `tests/performance/map_aggregation_performance.test.sql` - -Measure performance improvement: -1. Create flow with map -> map -> map chain -2. Complete tasks and measure query times -3. Compare with baseline from current implementation - -Expected improvements: -- `start_tasks`: 30-50% faster (no aggregation needed) -- `complete_task` broadcast: 20-30% faster -- Map-to-map chains: 50-70% faster (no aggregate/decompose cycle) - -## Migration Strategy - -### Option 1: Single Migration (Recommended) -1. Add column with DEFAULT NULL -2. Update all functions in same migration -3. Backfill existing data if needed - -### Option 2: Phased Migration -1. Add column (NULL allowed) -2. Update `complete_task` to write to both locations -3. Update consumers to read from new location -4. Remove old aggregation logic - -## Rollback Plan - -If issues arise: -1. Functions can temporarily aggregate on-the-fly if output column is NULL -2. Add fallback logic: `COALESCE(step_states.output, )` -3. Remove column only after confirming no dependencies - -## Benefits Summary - -1. **Performance**: Eliminate redundant aggregation queries -2. **Simplicity**: Single source of truth for step outputs -3. **Consistency**: Uniform output handling for all step types -4. **Maintainability**: Aggregation logic in one place -5. **Future-proof**: Enables features like result caching, partial re-runs - -## Risks & Mitigations - -| Risk | Mitigation | -|------|------------| -| Storage overhead | Minimal - same data, different location | -| Migration complexity | Test thoroughly, use transaction | -| Backward compatibility | Add column as nullable initially | -| Performance regression | Benchmark before/after | - -## Implementation Order - -1. Create performance baseline tests -2. Add schema migration -3. Update `complete_task` (primary change) -4. Update `cascade_complete_taskless_steps` -5. Simplify `start_tasks` -6. Simplify `maybe_complete_run` -7. Simplify broadcast in `complete_task` -8. Update/create tests -9. Run performance comparison -10. Document changes - -## Implementation Tasks - -### Phase 1: Schema & Core Function Updates -- [ ] Create migration adding `output` column to `step_states` -- [ ] Update `pgflow.complete_task` to populate `output` on step completion -- [ ] Update `pgflow.cascade_complete_taskless_steps` to set output for taskless steps - -### Phase 2: Consumer Function Updates -- [ ] Simplify `pgflow.start_tasks` to read from `step_states.output` -- [ ] Simplify `pgflow.maybe_complete_run` to read from `step_states.output` -- [ ] Simplify broadcast in `pgflow.complete_task` to use `v_step_state.output` - -### Phase 3: Test Updates -- [ ] Update map output aggregation test assertions (18 tests) -- [ ] Remove `broadcast_output_verification.test.sql` -- [ ] Verify `broadcast_event_bug.test.sql` passes without modification -- [ ] Create 5 new tests for output column behavior -- [ ] Run and verify all tests pass - -### Phase 4: Performance Validation -- [ ] Create baseline performance measurements -- [ ] Run performance tests after implementation -- [ ] Document performance improvements - -### Phase 5: Cleanup -- [ ] Remove temporary migration `20250917161744_pgflow_temp_handle_map_output_aggregation.sql` -- [ ] Consolidate into single clean migration -- [ ] Update documentation - -## Success Criteria - -- [ ] All existing tests pass with modifications -- [ ] New tests verify output column behavior -- [ ] Performance tests show improvement or no regression -- [ ] Code complexity reduced (3 aggregation sites -> 1) -- [ ] Migration runs cleanly on existing data -- [ ] Broadcast bug architecturally prevented \ No newline at end of file diff --git a/pkgs/core/PLAN_race_condition_testing.md b/pkgs/core/PLAN_race_condition_testing.md deleted file mode 100644 index 6667e1e30..000000000 --- a/pkgs/core/PLAN_race_condition_testing.md +++ /dev/null @@ -1,176 +0,0 @@ -# PLAN: Race Condition Testing for Type Violations - -## Background - -When a type violation occurs (e.g., single step produces non-array for dependent map), the system must archive ALL active messages to prevent orphaned messages that cycle through workers indefinitely. - -## Current Issue - -The fix archives both `'queued'` AND `'started'` tasks, but existing tests don't properly validate the race condition scenarios. - -## Test Scenarios Needed - -### 1. Basic Type Violation (✅ Already Covered) -**Scenario**: Single task causes type violation -``` -step1 (single) → step2 (single) → map_step -``` -- Worker completes step2 with non-array -- Verify run fails and current task's message is archived -- **Coverage**: `non_array_to_map_should_fail.test.sql` - -### 2. Concurrent Started Tasks (❌ Not Covered) -**Scenario**: Multiple workers have tasks in 'started' state when violation occurs -``` -producer (single) → map_consumer (map, expects array) -producer (single) → parallel_task1 (single) -producer (single) → parallel_task2 (single) -``` - -**Test Flow**: -1. Complete producer with `[1, 2, 3]` (spawns 3 map tasks + 2 parallel tasks) -2. Worker A starts `map_consumer[0]` -3. Worker B starts `map_consumer[1]` -4. Worker C starts `parallel_task1` -5. Worker D starts `parallel_task2` -6. Worker C completes `parallel_task1` with non-array (violates some other map dependency) -7. **Verify**: ALL started tasks (map_consumer[0], map_consumer[1], parallel_task2) get archived - -### 3. Mixed Queue States (❌ Not Covered) -**Scenario**: Mix of queued and started tasks across different steps -``` -step1 → step2 → step3 → map_step - ↘ step4 → step5 -``` - -**Test Flow**: -1. Complete step1 -2. Worker A starts step2 -3. Worker B starts step4 -4. Step3 and step5 remain queued -5. Worker A completes step2 with type violation -6. **Verify**: Both started (step4) AND queued (step3, step5) messages archived - -### 4. Map Task Partial Processing (❌ Not Covered) -**Scenario**: Some map tasks started, others queued when violation occurs -``` -producer → large_map (100 elements) -``` - -**Test Flow**: -1. Producer outputs array of 100 elements -2. Workers start processing first 10 tasks -3. 90 tasks remain queued -4. One of the started tasks detects downstream type violation -5. **Verify**: All 100 messages (10 started + 90 queued) get archived - -### 5. Visibility Timeout Verification (❌ Not Covered) -**Scenario**: Ensure orphaned messages don't reappear after timeout -``` -step1 → step2 → map_step -``` - -**Test Flow**: -1. Worker starts step2 (30s visibility timeout) -2. Type violation occurs but message NOT archived (simulate bug) -3. Wait 31 seconds -4. **Verify**: Message reappears in queue (demonstrates the bug) -5. Apply fix and verify message doesn't reappear - -### 6. Nested Map Chains (❌ Not Covered) -**Scenario**: Type violation in middle of map chain -``` -map1 (produces arrays) → map2 (expects arrays) → map3 -``` - -**Test Flow**: -1. map1 task completes with non-array (violates map2 expectation) -2. Other map1 tasks are in various states (started/queued) -3. **Verify**: All map1 tasks archived, map2 never starts - -### 7. Race During Archival (❌ Not Covered) -**Scenario**: Worker tries to complete task while archival is happening -``` -step1 → step2 → map_step -``` - -**Test Flow**: -1. Worker A detects type violation, begins archiving -2. Worker B tries to complete its task during archival -3. **Verify**: Worker B's completion is rejected (guard clause) -4. **Verify**: No duplicate archival attempts - -## Implementation Strategy - -### Test Utilities Needed - -1. **Multi-worker simulator**: -```sql -CREATE FUNCTION pgflow_tests.simulate_worker( - worker_id uuid, - flow_slug text -) RETURNS TABLE(...); -``` - -2. **Queue state inspector**: -```sql -CREATE FUNCTION pgflow_tests.inspect_queue_state( - flow_slug text -) RETURNS TABLE( - message_id bigint, - task_status text, - visibility_timeout timestamptz -); -``` - -3. **Time manipulation** (for visibility timeout tests): -```sql --- May need to mock pgmq visibility behavior -``` - -### Test File Organization - -``` -supabase/tests/type_violations/ -├── basic_violation.test.sql # Existing coverage -├── concurrent_started_tasks.test.sql # NEW: Scenario 2 -├── mixed_queue_states.test.sql # NEW: Scenario 3 -├── map_partial_processing.test.sql # NEW: Scenario 4 -├── visibility_timeout_recovery.test.sql # NEW: Scenario 5 -├── nested_map_chains.test.sql # NEW: Scenario 6 -└── race_during_archival.test.sql # NEW: Scenario 7 -``` - -## Success Criteria - -1. **No orphaned messages**: Queue must be empty after type violation -2. **No message resurrection**: Archived messages don't reappear after timeout -3. **Complete cleanup**: ALL tasks (queued + started) for the run are handled -4. **Atomic operation**: Archival happens in single transaction -5. **Guard effectiveness**: No operations on failed runs - -## Performance Considerations - -- Test with large numbers of tasks (1000+) to verify batch archival performance -- Ensure archival doesn't lock tables for extended periods -- Verify index usage on `(run_id, status, message_id)` - -## Current Gap Analysis - -**What we have**: -- Basic type violation detection ✅ -- Single task archival ✅ -- Run failure on violation ✅ - -**What we need**: -- True concurrent worker simulation ❌ -- Multi-task race condition validation ❌ -- Visibility timeout verification ❌ -- Performance under load testing ❌ - -## Priority - -1. **HIGH**: Concurrent started tasks (Scenario 2) - Most common real-world case -2. **HIGH**: Map partial processing (Scenario 4) - Critical for large arrays -3. **MEDIUM**: Mixed queue states (Scenario 3) - Complex flows -4. **LOW**: Other scenarios - Edge cases but important for robustness \ No newline at end of file diff --git a/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql index 60321c933..d46c40de3 100644 --- a/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql +++ b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql @@ -72,19 +72,10 @@ BEGIN AND s.step_slug = ss.step_slug ), -- ---------- Update run counters ---------- + -- Only decrement remaining_steps; let maybe_complete_run handle finalization run_updates AS ( UPDATE pgflow.runs r - SET remaining_steps = r.remaining_steps - c.completed_count, - status = CASE - WHEN r.remaining_steps - c.completed_count = 0 - THEN 'completed' - ELSE r.status - END, - completed_at = CASE - WHEN r.remaining_steps - c.completed_count = 0 - THEN now() - ELSE r.completed_at - END + SET remaining_steps = r.remaining_steps - c.completed_count FROM (SELECT COUNT(*) AS completed_count FROM completed) c WHERE r.run_id = cascade_complete_taskless_steps.run_id AND c.completed_count > 0 diff --git a/pkgs/core/schemas/0100_function_start_flow.sql b/pkgs/core/schemas/0100_function_start_flow.sql index 56a5838fb..f0a2bfed3 100644 --- a/pkgs/core/schemas/0100_function_start_flow.sql +++ b/pkgs/core/schemas/0100_function_start_flow.sql @@ -118,6 +118,10 @@ PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); -- Start root steps (those with no dependencies) PERFORM pgflow.start_ready_steps(v_created_run.run_id); +-- ---------- Check for run completion ---------- +-- If cascade completed all steps (zero-task flows), finalize the run +PERFORM pgflow.maybe_complete_run(v_created_run.run_id); + RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; end; diff --git a/pkgs/core/supabase/migrations/20250912075001_pgflow_temp_pr1_schema.sql b/pkgs/core/supabase/migrations/20250912075001_pgflow_temp_pr1_schema.sql deleted file mode 100644 index 099659d21..000000000 --- a/pkgs/core/supabase/migrations/20250912075001_pgflow_temp_pr1_schema.sql +++ /dev/null @@ -1,185 +0,0 @@ --- Modify "step_states" table -ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "step_states_remaining_tasks_check", ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> 'created'::text)), ADD CONSTRAINT "step_states_initial_tasks_check" CHECK (initial_tasks >= 0), ALTER COLUMN "remaining_tasks" DROP NOT NULL, ALTER COLUMN "remaining_tasks" DROP DEFAULT, ADD COLUMN "initial_tasks" integer NULL DEFAULT 1; --- Modify "step_tasks" table -ALTER TABLE "pgflow"."step_tasks" DROP CONSTRAINT "only_single_task_per_step"; --- Modify "steps" table -ALTER TABLE "pgflow"."steps" DROP CONSTRAINT "steps_step_type_check", ADD CONSTRAINT "steps_step_type_check" CHECK (step_type = ANY (ARRAY['single'::text, 'map'::text])); --- Modify "start_ready_steps" function -CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$ -WITH ready_steps AS ( - SELECT * - FROM pgflow.step_states AS step_state - WHERE step_state.run_id = start_ready_steps.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - ORDER BY step_state.step_slug - FOR UPDATE -), -started_step_states AS ( - UPDATE pgflow.step_states - SET status = 'started', - started_at = now(), - remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting - FROM ready_steps - WHERE pgflow.step_states.run_id = start_ready_steps.run_id - AND pgflow.step_states.step_slug = ready_steps.step_slug - RETURNING pgflow.step_states.* -), -sent_messages AS ( - SELECT - started_step.flow_slug, - started_step.run_id, - started_step.step_slug, - pgmq.send( - started_step.flow_slug, - jsonb_build_object( - 'flow_slug', started_step.flow_slug, - 'run_id', started_step.run_id, - 'step_slug', started_step.step_slug, - 'task_index', 0 - ), - COALESCE(step.opt_start_delay, 0) - ) AS msg_id - FROM started_step_states AS started_step - JOIN pgflow.steps AS step - ON step.flow_slug = started_step.flow_slug - AND step.step_slug = started_step.step_slug -), -broadcast_events AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:started', - 'run_id', started_step.run_id, - 'step_slug', started_step.step_slug, - 'status', 'started', - 'started_at', started_step.started_at, - 'remaining_tasks', started_step.remaining_tasks, - 'remaining_deps', started_step.remaining_deps - ), - concat('step:', started_step.step_slug, ':started'), - concat('pgflow:run:', started_step.run_id), - false - ) - FROM started_step_states AS started_step -) -INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, message_id) -SELECT - sent_messages.flow_slug, - sent_messages.run_id, - sent_messages.step_slug, - sent_messages.msg_id -FROM sent_messages; -$$; --- Modify "start_flow" function -CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_created_run pgflow.runs%ROWTYPE; -begin - -WITH - flow_steps AS ( - SELECT steps.flow_slug, steps.step_slug, steps.deps_count - FROM pgflow.steps - WHERE steps.flow_slug = start_flow.flow_slug - ), - created_run AS ( - INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) - VALUES ( - COALESCE(start_flow.run_id, gen_random_uuid()), - start_flow.flow_slug, - start_flow.input, - (SELECT count(*) FROM flow_steps) - ) - RETURNING * - ), - created_step_states AS ( - INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) - SELECT - fs.flow_slug, - (SELECT created_run.run_id FROM created_run), - fs.step_slug, - fs.deps_count, - 1 -- For now, all steps get initial_tasks = 1 (single steps) - FROM flow_steps fs - ) -SELECT * FROM created_run INTO v_created_run; - --- Send broadcast event for run started -PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'run:started', - 'run_id', v_created_run.run_id, - 'flow_slug', v_created_run.flow_slug, - 'input', v_created_run.input, - 'status', 'started', - 'remaining_steps', v_created_run.remaining_steps, - 'started_at', v_created_run.started_at - ), - 'run:started', - concat('pgflow:run:', v_created_run.run_id), - false -); - -PERFORM pgflow.start_ready_steps(v_created_run.run_id); - -RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; - -end; -$$; --- Create "add_step" function -CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ -DECLARE - result_step pgflow.steps; - next_idx int; -BEGIN - -- Validate map step constraints - -- Map steps can have either: - -- 0 dependencies (root map - maps over flow input array) - -- 1 dependency (dependent map - maps over dependency output array) - IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN - RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', - add_step.step_slug, - COALESCE(array_length(add_step.deps_slugs, 1), 0), - array_to_string(add_step.deps_slugs, ', '); - END IF; - - -- Get next step index - SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx - FROM pgflow.steps s - WHERE s.flow_slug = add_step.flow_slug; - - -- Create the step - INSERT INTO pgflow.steps ( - flow_slug, step_slug, step_type, step_index, deps_count, - opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay - ) - VALUES ( - add_step.flow_slug, - add_step.step_slug, - COALESCE(add_step.step_type, 'single'), - next_idx, - COALESCE(array_length(add_step.deps_slugs, 1), 0), - add_step.max_attempts, - add_step.base_delay, - add_step.timeout, - add_step.start_delay - ) - ON CONFLICT ON CONSTRAINT steps_pkey - DO UPDATE SET step_slug = EXCLUDED.step_slug - RETURNING * INTO result_step; - - -- Insert dependencies - INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) - SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug - FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) - WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 - ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; - - RETURN result_step; -END; -$$; --- Drop "add_step" function -DROP FUNCTION "pgflow"."add_step" (text, text, integer, integer, integer, integer); --- Drop "add_step" function -DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer); diff --git a/pkgs/core/supabase/migrations/20250912080800_pgflow_temp_pr2_root_maps.sql b/pkgs/core/supabase/migrations/20250912080800_pgflow_temp_pr2_root_maps.sql deleted file mode 100644 index e85fdee3c..000000000 --- a/pkgs/core/supabase/migrations/20250912080800_pgflow_temp_pr2_root_maps.sql +++ /dev/null @@ -1,95 +0,0 @@ --- Modify "step_states" table -ALTER TABLE "pgflow"."step_states" ALTER COLUMN "initial_tasks" SET NOT NULL; --- Modify "start_flow" function -CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_created_run pgflow.runs%ROWTYPE; - v_root_map_count int; -begin - --- Check for root map steps and validate input -WITH root_maps AS ( - SELECT step_slug - FROM pgflow.steps - WHERE steps.flow_slug = start_flow.flow_slug - AND steps.step_type = 'map' - AND steps.deps_count = 0 -) -SELECT COUNT(*) INTO v_root_map_count FROM root_maps; - --- If we have root map steps, validate that input is an array -IF v_root_map_count > 0 THEN - -- First check for NULL (should be caught by NOT NULL constraint, but be defensive) - IF start_flow.input IS NULL THEN - RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug; - END IF; - - -- Then check if it's not an array - IF jsonb_typeof(start_flow.input) != 'array' THEN - RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)', - start_flow.flow_slug, jsonb_typeof(start_flow.input); - END IF; -END IF; - -WITH - flow_steps AS ( - SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count - FROM pgflow.steps - WHERE steps.flow_slug = start_flow.flow_slug - ), - created_run AS ( - INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) - VALUES ( - COALESCE(start_flow.run_id, gen_random_uuid()), - start_flow.flow_slug, - start_flow.input, - (SELECT count(*) FROM flow_steps) - ) - RETURNING * - ), - created_step_states AS ( - INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) - SELECT - fs.flow_slug, - (SELECT created_run.run_id FROM created_run), - fs.step_slug, - fs.deps_count, - -- For root map steps (map with no deps), set initial_tasks to array length - -- For all other steps, set initial_tasks to 1 - CASE - WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN - CASE - WHEN jsonb_typeof(start_flow.input) = 'array' THEN - jsonb_array_length(start_flow.input) - ELSE - 1 - END - ELSE - 1 - END - FROM flow_steps fs - ) -SELECT * FROM created_run INTO v_created_run; - --- Send broadcast event for run started -PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'run:started', - 'run_id', v_created_run.run_id, - 'flow_slug', v_created_run.flow_slug, - 'input', v_created_run.input, - 'status', 'started', - 'remaining_steps', v_created_run.remaining_steps, - 'started_at', v_created_run.started_at - ), - 'run:started', - concat('pgflow:run:', v_created_run.run_id), - false -); - -PERFORM pgflow.start_ready_steps(v_created_run.run_id); - -RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; - -end; -$$; diff --git a/pkgs/core/supabase/migrations/20250912125339_pgflow_TEMP_task_spawning_optimization.sql b/pkgs/core/supabase/migrations/20250912125339_pgflow_TEMP_task_spawning_optimization.sql deleted file mode 100644 index 41f6d152a..000000000 --- a/pkgs/core/supabase/migrations/20250912125339_pgflow_TEMP_task_spawning_optimization.sql +++ /dev/null @@ -1,146 +0,0 @@ --- Modify "start_ready_steps" function -CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$ --- First handle empty array map steps (initial_tasks = 0) - direct transition to completed -WITH empty_map_steps AS ( - SELECT step_state.* - FROM pgflow.step_states AS step_state - JOIN pgflow.steps AS step - ON step.flow_slug = step_state.flow_slug - AND step.step_slug = step_state.step_slug - WHERE step_state.run_id = start_ready_steps.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - AND step.step_type = 'map' - AND step_state.initial_tasks = 0 - ORDER BY step_state.step_slug - FOR UPDATE OF step_state -), -completed_empty_steps AS ( - UPDATE pgflow.step_states - SET status = 'completed', - started_at = now(), - completed_at = now(), - remaining_tasks = 0 - FROM empty_map_steps - WHERE pgflow.step_states.run_id = start_ready_steps.run_id - AND pgflow.step_states.step_slug = empty_map_steps.step_slug - RETURNING pgflow.step_states.* -), -broadcast_empty_completed AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:completed', - 'run_id', completed_step.run_id, - 'step_slug', completed_step.step_slug, - 'status', 'completed', - 'started_at', completed_step.started_at, - 'completed_at', completed_step.completed_at, - 'remaining_tasks', 0, - 'remaining_deps', 0, - 'output', '[]'::jsonb - ), - concat('step:', completed_step.step_slug, ':completed'), - concat('pgflow:run:', completed_step.run_id), - false - ) - FROM completed_empty_steps AS completed_step -), - --- Now handle non-empty steps (both single and map with initial_tasks > 0) -ready_steps AS ( - SELECT * - FROM pgflow.step_states AS step_state - WHERE step_state.run_id = start_ready_steps.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - -- Exclude empty map steps already handled - AND NOT EXISTS ( - SELECT 1 FROM empty_map_steps - WHERE empty_map_steps.run_id = step_state.run_id - AND empty_map_steps.step_slug = step_state.step_slug - ) - ORDER BY step_state.step_slug - FOR UPDATE -), -started_step_states AS ( - UPDATE pgflow.step_states - SET status = 'started', - started_at = now(), - remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting - FROM ready_steps - WHERE pgflow.step_states.run_id = start_ready_steps.run_id - AND pgflow.step_states.step_slug = ready_steps.step_slug - RETURNING pgflow.step_states.* -), - --- Generate tasks based on initial_tasks count --- For single steps: initial_tasks = 1, so generate_series(0, 0) = single task with index 0 --- For map steps: initial_tasks = N, so generate_series(0, N-1) = N tasks with indices 0..N-1 --- Group messages by step for batch sending -message_batches AS ( - SELECT - started_step.flow_slug, - started_step.run_id, - started_step.step_slug, - COALESCE(step.opt_start_delay, 0) as delay, - array_agg( - jsonb_build_object( - 'flow_slug', started_step.flow_slug, - 'run_id', started_step.run_id, - 'step_slug', started_step.step_slug, - 'task_index', task_idx.task_index - ) ORDER BY task_idx.task_index - ) AS messages, - array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices - FROM started_step_states AS started_step - JOIN pgflow.steps AS step - ON step.flow_slug = started_step.flow_slug - AND step.step_slug = started_step.step_slug - -- Generate task indices from 0 to initial_tasks-1 - CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) - GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay -), --- Send messages in batch for better performance with large arrays -sent_messages AS ( - SELECT - mb.flow_slug, - mb.run_id, - mb.step_slug, - task_indices.task_index, - msg_ids.msg_id - FROM message_batches mb - CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) - CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) - WHERE task_indices.idx_ord = msg_ids.msg_ord -), - -broadcast_events AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:started', - 'run_id', started_step.run_id, - 'step_slug', started_step.step_slug, - 'status', 'started', - 'started_at', started_step.started_at, - 'remaining_tasks', started_step.remaining_tasks, - 'remaining_deps', started_step.remaining_deps - ), - concat('step:', started_step.step_slug, ':started'), - concat('pgflow:run:', started_step.run_id), - false - ) - FROM started_step_states AS started_step -) - --- Insert all generated tasks with their respective task_index values -INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) -SELECT - sent_messages.flow_slug, - sent_messages.run_id, - sent_messages.step_slug, - sent_messages.task_index, - sent_messages.msg_id -FROM sent_messages; -$$; diff --git a/pkgs/core/supabase/migrations/20250916093518_pgflow_temp_add_cascade_complete.sql b/pkgs/core/supabase/migrations/20250916093518_pgflow_temp_add_cascade_complete.sql deleted file mode 100644 index a706760dd..000000000 --- a/pkgs/core/supabase/migrations/20250916093518_pgflow_temp_add_cascade_complete.sql +++ /dev/null @@ -1,321 +0,0 @@ --- Create "cascade_complete_taskless_steps" function -CREATE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$ -DECLARE - v_total_completed int := 0; - v_iteration_completed int; - v_iterations int := 0; - v_max_iterations int := 50; -BEGIN - LOOP - -- Safety counter to prevent infinite loops - v_iterations := v_iterations + 1; - IF v_iterations > v_max_iterations THEN - RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; - END IF; - - WITH completed AS ( - -- Complete all ready taskless steps in topological order - UPDATE pgflow.step_states ss - SET status = 'completed', - started_at = now(), - completed_at = now(), - remaining_tasks = 0 - FROM pgflow.steps s - WHERE ss.run_id = cascade_complete_taskless_steps.run_id - AND ss.flow_slug = s.flow_slug - AND ss.step_slug = s.step_slug - AND ss.status = 'created' - AND ss.remaining_deps = 0 - AND ss.initial_tasks = 0 - -- Process in topological order to ensure proper cascade - RETURNING ss.* - ), - dep_updates AS ( - -- Update remaining_deps and initial_tasks for dependents of completed steps - UPDATE pgflow.step_states ss - SET remaining_deps = ss.remaining_deps - dep_count.count, - -- If the dependent is a map step and its dependency completed with 0 tasks, - -- set its initial_tasks to 0 as well - initial_tasks = CASE - WHEN s.step_type = 'map' AND dep_count.has_zero_tasks - THEN 0 - ELSE ss.initial_tasks - END - FROM ( - -- Count how many completed steps are dependencies of each dependent - SELECT - d.flow_slug, - d.step_slug as dependent_slug, - COUNT(*) as count, - BOOL_OR(c.initial_tasks = 0) as has_zero_tasks - FROM completed c - JOIN pgflow.deps d ON d.flow_slug = c.flow_slug - AND d.dep_slug = c.step_slug - GROUP BY d.flow_slug, d.step_slug - ) dep_count, - pgflow.steps s - WHERE ss.run_id = cascade_complete_taskless_steps.run_id - AND ss.flow_slug = dep_count.flow_slug - AND ss.step_slug = dep_count.dependent_slug - AND s.flow_slug = ss.flow_slug - AND s.step_slug = ss.step_slug - ), - run_updates AS ( - -- Update run's remaining_steps count - UPDATE pgflow.runs r - SET remaining_steps = r.remaining_steps - c.completed_count, - status = CASE - WHEN r.remaining_steps - c.completed_count = 0 - THEN 'completed' - ELSE r.status - END, - completed_at = CASE - WHEN r.remaining_steps - c.completed_count = 0 - THEN now() - ELSE r.completed_at - END - FROM (SELECT COUNT(*) AS completed_count FROM completed) c - WHERE r.run_id = cascade_complete_taskless_steps.run_id - AND c.completed_count > 0 - ) - SELECT COUNT(*) INTO v_iteration_completed FROM completed; - - EXIT WHEN v_iteration_completed = 0; - v_total_completed := v_total_completed + v_iteration_completed; - END LOOP; - - RETURN v_total_completed; -END; -$$; --- Modify "complete_task" function -CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_step_state pgflow.step_states%ROWTYPE; -begin - -WITH run_lock AS ( - SELECT * FROM pgflow.runs - WHERE pgflow.runs.run_id = complete_task.run_id - FOR UPDATE -), -step_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - FOR UPDATE -), -task AS ( - UPDATE pgflow.step_tasks - SET - status = 'completed', - completed_at = now(), - output = complete_task.output - WHERE pgflow.step_tasks.run_id = complete_task.run_id - AND pgflow.step_tasks.step_slug = complete_task.step_slug - AND pgflow.step_tasks.task_index = complete_task.task_index - AND pgflow.step_tasks.status = 'started' - RETURNING * -), -step_state AS ( - UPDATE pgflow.step_states - SET - status = CASE - WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement - ELSE 'started' - END, - completed_at = CASE - WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement - ELSE NULL - END, - remaining_tasks = pgflow.step_states.remaining_tasks - 1 - FROM task - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - RETURNING pgflow.step_states.* -), --- Find all dependent steps if the current step was completed -dependent_steps AS ( - SELECT d.step_slug AS dependent_step_slug - FROM pgflow.deps d - JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug - WHERE d.dep_slug = complete_task.step_slug - ORDER BY d.step_slug -- Ensure consistent ordering -), --- Lock dependent steps before updating -dependent_steps_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps) - FOR UPDATE -), --- Update all dependent steps -dependent_steps_update AS ( - UPDATE pgflow.step_states ss - SET remaining_deps = ss.remaining_deps - 1, - -- For map dependents of single steps producing arrays, set initial_tasks - initial_tasks = CASE - WHEN s.step_type = 'map' AND jsonb_typeof(complete_task.output) = 'array' - THEN jsonb_array_length(complete_task.output) - ELSE ss.initial_tasks - END - FROM dependent_steps ds, pgflow.steps s - WHERE ss.run_id = complete_task.run_id - AND ss.step_slug = ds.dependent_step_slug - AND s.flow_slug = ss.flow_slug - AND s.step_slug = ss.step_slug -) --- Only decrement remaining_steps, don't update status -UPDATE pgflow.runs -SET remaining_steps = pgflow.runs.remaining_steps - 1 -FROM step_state -WHERE pgflow.runs.run_id = complete_task.run_id - AND step_state.status = 'completed'; - --- Get the updated step state for broadcasting -SELECT * INTO v_step_state FROM pgflow.step_states -WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; - --- Send broadcast event for step completed if the step is completed -IF v_step_state.status = 'completed' THEN - -- Step just completed, cascade any ready taskless steps - PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); - - PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'step:completed', - 'run_id', complete_task.run_id, - 'step_slug', complete_task.step_slug, - 'status', 'completed', - 'output', complete_task.output, - 'completed_at', v_step_state.completed_at - ), - concat('step:', complete_task.step_slug, ':completed'), - concat('pgflow:run:', complete_task.run_id), - false - ); -END IF; - --- For completed tasks: archive the message -PERFORM ( - WITH completed_tasks AS ( - SELECT r.flow_slug, st.message_id - FROM pgflow.step_tasks st - JOIN pgflow.runs r ON st.run_id = r.run_id - WHERE st.run_id = complete_task.run_id - AND st.step_slug = complete_task.step_slug - AND st.task_index = complete_task.task_index - AND st.status = 'completed' - ) - SELECT pgmq.archive(ct.flow_slug, ct.message_id) - FROM completed_tasks ct - WHERE EXISTS (SELECT 1 FROM completed_tasks) -); - -PERFORM pgflow.start_ready_steps(complete_task.run_id); - -PERFORM pgflow.maybe_complete_run(complete_task.run_id); - -RETURN QUERY SELECT * -FROM pgflow.step_tasks AS step_task -WHERE step_task.run_id = complete_task.run_id - AND step_task.step_slug = complete_task.step_slug - AND step_task.task_index = complete_task.task_index; - -end; -$$; --- Modify "start_flow" function -CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_created_run pgflow.runs%ROWTYPE; - v_root_map_count int; -begin - --- Check for root map steps and validate input -WITH root_maps AS ( - SELECT step_slug - FROM pgflow.steps - WHERE steps.flow_slug = start_flow.flow_slug - AND steps.step_type = 'map' - AND steps.deps_count = 0 -) -SELECT COUNT(*) INTO v_root_map_count FROM root_maps; - --- If we have root map steps, validate that input is an array -IF v_root_map_count > 0 THEN - -- First check for NULL (should be caught by NOT NULL constraint, but be defensive) - IF start_flow.input IS NULL THEN - RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug; - END IF; - - -- Then check if it's not an array - IF jsonb_typeof(start_flow.input) != 'array' THEN - RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)', - start_flow.flow_slug, jsonb_typeof(start_flow.input); - END IF; -END IF; - -WITH - flow_steps AS ( - SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count - FROM pgflow.steps - WHERE steps.flow_slug = start_flow.flow_slug - ), - created_run AS ( - INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) - VALUES ( - COALESCE(start_flow.run_id, gen_random_uuid()), - start_flow.flow_slug, - start_flow.input, - (SELECT count(*) FROM flow_steps) - ) - RETURNING * - ), - created_step_states AS ( - INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) - SELECT - fs.flow_slug, - (SELECT created_run.run_id FROM created_run), - fs.step_slug, - fs.deps_count, - -- For root map steps (map with no deps), set initial_tasks to array length - -- For all other steps, set initial_tasks to 1 - CASE - WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN - CASE - WHEN jsonb_typeof(start_flow.input) = 'array' THEN - jsonb_array_length(start_flow.input) - ELSE - 1 - END - ELSE - 1 - END - FROM flow_steps fs - ) -SELECT * FROM created_run INTO v_created_run; - --- Send broadcast event for run started -PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'run:started', - 'run_id', v_created_run.run_id, - 'flow_slug', v_created_run.flow_slug, - 'input', v_created_run.input, - 'status', 'started', - 'remaining_steps', v_created_run.remaining_steps, - 'started_at', v_created_run.started_at - ), - 'run:started', - concat('pgflow:run:', v_created_run.run_id), - false -); - --- Complete any taskless steps that are ready (e.g., empty array maps) -PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); - -PERFORM pgflow.start_ready_steps(v_created_run.run_id); - -RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; - -end; -$$; diff --git a/pkgs/core/supabase/migrations/20250916142327_pgflow_temp_make_initial_tasks_nullable.sql b/pkgs/core/supabase/migrations/20250916142327_pgflow_temp_make_initial_tasks_nullable.sql deleted file mode 100644 index feb265556..000000000 --- a/pkgs/core/supabase/migrations/20250916142327_pgflow_temp_make_initial_tasks_nullable.sql +++ /dev/null @@ -1,624 +0,0 @@ --- Modify "step_states" table -ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "step_states_initial_tasks_check", ADD CONSTRAINT "step_states_initial_tasks_check" CHECK ((initial_tasks IS NULL) OR (initial_tasks >= 0)), ADD CONSTRAINT "initial_tasks_known_when_started" CHECK ((status <> 'started'::text) OR (initial_tasks IS NOT NULL)), ALTER COLUMN "initial_tasks" DROP NOT NULL, ALTER COLUMN "initial_tasks" DROP DEFAULT; --- Modify "cascade_complete_taskless_steps" function -CREATE OR REPLACE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$ -DECLARE - v_total_completed int := 0; - v_iteration_completed int; - v_iterations int := 0; - v_max_iterations int := 50; -BEGIN - -- ========================================== - -- ITERATIVE CASCADE COMPLETION - -- ========================================== - -- Completes taskless steps in waves until none remain - LOOP - -- ---------- Safety check ---------- - v_iterations := v_iterations + 1; - IF v_iterations > v_max_iterations THEN - RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; - END IF; - - -- ========================================== - -- COMPLETE READY TASKLESS STEPS - -- ========================================== - WITH completed AS ( - -- ---------- Complete taskless steps ---------- - -- Steps with initial_tasks=0 and no remaining deps - UPDATE pgflow.step_states ss - SET status = 'completed', - started_at = now(), - completed_at = now(), - remaining_tasks = 0 - FROM pgflow.steps s - WHERE ss.run_id = cascade_complete_taskless_steps.run_id - AND ss.flow_slug = s.flow_slug - AND ss.step_slug = s.step_slug - AND ss.status = 'created' - AND ss.remaining_deps = 0 - AND ss.initial_tasks = 0 - -- Process in topological order to ensure proper cascade - RETURNING ss.* - ), - -- ---------- Update dependent steps ---------- - -- Propagate completion and empty arrays to dependents - dep_updates AS ( - UPDATE pgflow.step_states ss - SET remaining_deps = ss.remaining_deps - dep_count.count, - -- If the dependent is a map step and its dependency completed with 0 tasks, - -- set its initial_tasks to 0 as well - initial_tasks = CASE - WHEN s.step_type = 'map' AND dep_count.has_zero_tasks - THEN 0 -- Empty array propagation - ELSE ss.initial_tasks -- Keep existing value (including NULL) - END - FROM ( - -- Aggregate dependency updates per dependent step - SELECT - d.flow_slug, - d.step_slug as dependent_slug, - COUNT(*) as count, - BOOL_OR(c.initial_tasks = 0) as has_zero_tasks - FROM completed c - JOIN pgflow.deps d ON d.flow_slug = c.flow_slug - AND d.dep_slug = c.step_slug - GROUP BY d.flow_slug, d.step_slug - ) dep_count, - pgflow.steps s - WHERE ss.run_id = cascade_complete_taskless_steps.run_id - AND ss.flow_slug = dep_count.flow_slug - AND ss.step_slug = dep_count.dependent_slug - AND s.flow_slug = ss.flow_slug - AND s.step_slug = ss.step_slug - ), - -- ---------- Update run counters ---------- - run_updates AS ( - UPDATE pgflow.runs r - SET remaining_steps = r.remaining_steps - c.completed_count, - status = CASE - WHEN r.remaining_steps - c.completed_count = 0 - THEN 'completed' - ELSE r.status - END, - completed_at = CASE - WHEN r.remaining_steps - c.completed_count = 0 - THEN now() - ELSE r.completed_at - END - FROM (SELECT COUNT(*) AS completed_count FROM completed) c - WHERE r.run_id = cascade_complete_taskless_steps.run_id - AND c.completed_count > 0 - ) - -- ---------- Check iteration results ---------- - SELECT COUNT(*) INTO v_iteration_completed FROM completed; - - EXIT WHEN v_iteration_completed = 0; -- No more steps to complete - v_total_completed := v_total_completed + v_iteration_completed; - END LOOP; - - RETURN v_total_completed; -END; -$$; --- Modify "maybe_complete_run" function -CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_completed_run pgflow.runs%ROWTYPE; -begin - -- ========================================== - -- CHECK AND COMPLETE RUN IF FINISHED - -- ========================================== - WITH run_output AS ( - -- ---------- Gather outputs from leaf steps ---------- - -- Leaf steps = steps with no dependents - SELECT jsonb_object_agg(st.step_slug, st.output) as final_output - FROM pgflow.step_tasks st - JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug - JOIN pgflow.runs r ON r.run_id = ss.run_id AND r.flow_slug = ss.flow_slug - WHERE st.run_id = maybe_complete_run.run_id - AND st.status = 'completed' - AND NOT EXISTS ( - SELECT 1 - FROM pgflow.deps d - WHERE d.flow_slug = ss.flow_slug - AND d.dep_slug = ss.step_slug - ) - ) - -- ---------- Complete run if all steps done ---------- - UPDATE pgflow.runs - SET - status = 'completed', - completed_at = now(), - output = (SELECT final_output FROM run_output) - WHERE pgflow.runs.run_id = maybe_complete_run.run_id - AND pgflow.runs.remaining_steps = 0 - AND pgflow.runs.status != 'completed' - RETURNING * INTO v_completed_run; - - -- ========================================== - -- BROADCAST COMPLETION EVENT - -- ========================================== - IF v_completed_run.run_id IS NOT NULL THEN - PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'run:completed', - 'run_id', v_completed_run.run_id, - 'flow_slug', v_completed_run.flow_slug, - 'status', 'completed', - 'output', v_completed_run.output, - 'completed_at', v_completed_run.completed_at - ), - 'run:completed', - concat('pgflow:run:', v_completed_run.run_id), - false - ); - END IF; -end; -$$; --- Modify "start_ready_steps" function -CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$ --- ========================================== --- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) --- ========================================== --- These complete immediately without spawning tasks -WITH empty_map_steps AS ( - SELECT step_state.* - FROM pgflow.step_states AS step_state - JOIN pgflow.steps AS step - ON step.flow_slug = step_state.flow_slug - AND step.step_slug = step_state.step_slug - WHERE step_state.run_id = start_ready_steps.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - AND step.step_type = 'map' - AND step_state.initial_tasks = 0 - ORDER BY step_state.step_slug - FOR UPDATE OF step_state -), --- ---------- Complete empty map steps ---------- -completed_empty_steps AS ( - UPDATE pgflow.step_states - SET status = 'completed', - started_at = now(), - completed_at = now(), - remaining_tasks = 0 - FROM empty_map_steps - WHERE pgflow.step_states.run_id = start_ready_steps.run_id - AND pgflow.step_states.step_slug = empty_map_steps.step_slug - RETURNING pgflow.step_states.* -), --- ---------- Broadcast completion events ---------- -broadcast_empty_completed AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:completed', - 'run_id', completed_step.run_id, - 'step_slug', completed_step.step_slug, - 'status', 'completed', - 'started_at', completed_step.started_at, - 'completed_at', completed_step.completed_at, - 'remaining_tasks', 0, - 'remaining_deps', 0, - 'output', '[]'::jsonb - ), - concat('step:', completed_step.step_slug, ':completed'), - concat('pgflow:run:', completed_step.run_id), - false - ) - FROM completed_empty_steps AS completed_step -), - --- ========================================== --- HANDLE NORMAL STEPS (initial_tasks > 0) --- ========================================== --- ---------- Find ready steps ---------- --- Steps with no remaining deps and known task count -ready_steps AS ( - SELECT * - FROM pgflow.step_states AS step_state - WHERE step_state.run_id = start_ready_steps.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count - AND step_state.initial_tasks > 0 -- Don't start taskless steps - -- Exclude empty map steps already handled - AND NOT EXISTS ( - SELECT 1 FROM empty_map_steps - WHERE empty_map_steps.run_id = step_state.run_id - AND empty_map_steps.step_slug = step_state.step_slug - ) - ORDER BY step_state.step_slug - FOR UPDATE -), --- ---------- Mark steps as started ---------- -started_step_states AS ( - UPDATE pgflow.step_states - SET status = 'started', - started_at = now(), - remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting - FROM ready_steps - WHERE pgflow.step_states.run_id = start_ready_steps.run_id - AND pgflow.step_states.step_slug = ready_steps.step_slug - RETURNING pgflow.step_states.* -), - --- ========================================== --- TASK GENERATION AND QUEUE MESSAGES --- ========================================== --- ---------- Generate tasks and batch messages ---------- --- Single steps: 1 task (index 0) --- Map steps: N tasks (indices 0..N-1) -message_batches AS ( - SELECT - started_step.flow_slug, - started_step.run_id, - started_step.step_slug, - COALESCE(step.opt_start_delay, 0) as delay, - array_agg( - jsonb_build_object( - 'flow_slug', started_step.flow_slug, - 'run_id', started_step.run_id, - 'step_slug', started_step.step_slug, - 'task_index', task_idx.task_index - ) ORDER BY task_idx.task_index - ) AS messages, - array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices - FROM started_step_states AS started_step - JOIN pgflow.steps AS step - ON step.flow_slug = started_step.flow_slug - AND step.step_slug = started_step.step_slug - -- Generate task indices from 0 to initial_tasks-1 - CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) - GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay -), --- ---------- Send messages to queue ---------- --- Uses batch sending for performance with large arrays -sent_messages AS ( - SELECT - mb.flow_slug, - mb.run_id, - mb.step_slug, - task_indices.task_index, - msg_ids.msg_id - FROM message_batches mb - CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) - CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) - WHERE task_indices.idx_ord = msg_ids.msg_ord -), - --- ---------- Broadcast step:started events ---------- -broadcast_events AS ( - SELECT - realtime.send( - jsonb_build_object( - 'event_type', 'step:started', - 'run_id', started_step.run_id, - 'step_slug', started_step.step_slug, - 'status', 'started', - 'started_at', started_step.started_at, - 'remaining_tasks', started_step.remaining_tasks, - 'remaining_deps', started_step.remaining_deps - ), - concat('step:', started_step.step_slug, ':started'), - concat('pgflow:run:', started_step.run_id), - false - ) - FROM started_step_states AS started_step -) - --- ========================================== --- RECORD TASKS IN DATABASE --- ========================================== -INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) -SELECT - sent_messages.flow_slug, - sent_messages.run_id, - sent_messages.step_slug, - sent_messages.task_index, - sent_messages.msg_id -FROM sent_messages; -$$; --- Modify "complete_task" function -CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_step_state pgflow.step_states%ROWTYPE; - v_dependent_map_slug text; -begin - --- ========================================== --- VALIDATION: Array output for dependent maps --- ========================================== --- Must happen BEFORE acquiring locks to fail fast without holding resources -SELECT ds.step_slug INTO v_dependent_map_slug -FROM pgflow.deps d -JOIN pgflow.steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.step_slug -JOIN pgflow.step_states ss ON ss.flow_slug = ds.flow_slug AND ss.step_slug = ds.step_slug -WHERE d.dep_slug = complete_task.step_slug - AND d.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) - AND ds.step_type = 'map' - AND ss.run_id = complete_task.run_id - AND ss.initial_tasks IS NULL - AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') -LIMIT 1; - -IF v_dependent_map_slug IS NOT NULL THEN - RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)', - v_dependent_map_slug, - complete_task.step_slug, - CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END, - complete_task.output; -END IF; - --- ========================================== --- MAIN CTE CHAIN: Update task and propagate changes --- ========================================== -WITH --- ---------- Lock acquisition ---------- --- Acquire locks in consistent order (run -> step) to prevent deadlocks -run_lock AS ( - SELECT * FROM pgflow.runs - WHERE pgflow.runs.run_id = complete_task.run_id - FOR UPDATE -), -step_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - FOR UPDATE -), --- ---------- Task completion ---------- --- Update the task record with completion status and output -task AS ( - UPDATE pgflow.step_tasks - SET - status = 'completed', - completed_at = now(), - output = complete_task.output - WHERE pgflow.step_tasks.run_id = complete_task.run_id - AND pgflow.step_tasks.step_slug = complete_task.step_slug - AND pgflow.step_tasks.task_index = complete_task.task_index - AND pgflow.step_tasks.status = 'started' - RETURNING * -), --- ---------- Step state update ---------- --- Decrement remaining_tasks and potentially mark step as completed -step_state AS ( - UPDATE pgflow.step_states - SET - status = CASE - WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement - ELSE 'started' - END, - completed_at = CASE - WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement - ELSE NULL - END, - remaining_tasks = pgflow.step_states.remaining_tasks - 1 - FROM task - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - RETURNING pgflow.step_states.* -), --- ---------- Dependency resolution ---------- --- Find all steps that depend on the completed step (only if step completed) -dependent_steps AS ( - SELECT d.step_slug AS dependent_step_slug - FROM pgflow.deps d - JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug - WHERE d.dep_slug = complete_task.step_slug - ORDER BY d.step_slug -- Ensure consistent ordering -), --- ---------- Lock dependent steps ---------- --- Acquire locks on all dependent steps before updating them -dependent_steps_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps) - FOR UPDATE -), --- ---------- Update dependent steps ---------- --- Decrement remaining_deps and resolve NULL initial_tasks for map steps -dependent_steps_update AS ( - UPDATE pgflow.step_states ss - SET remaining_deps = ss.remaining_deps - 1, - -- Resolve NULL initial_tasks for dependent map steps - -- This is where dependent maps learn their array size from upstream - initial_tasks = CASE - WHEN s.step_type = 'map' AND ss.initial_tasks IS NULL - AND complete_task.output IS NOT NULL - AND jsonb_typeof(complete_task.output) = 'array' THEN - jsonb_array_length(complete_task.output) - ELSE ss.initial_tasks -- Keep existing value (including NULL) - END - FROM dependent_steps ds, pgflow.steps s - WHERE ss.run_id = complete_task.run_id - AND ss.step_slug = ds.dependent_step_slug - AND s.flow_slug = ss.flow_slug - AND s.step_slug = ss.step_slug -) --- ---------- Update run remaining_steps ---------- --- Decrement the run's remaining_steps counter if step completed -UPDATE pgflow.runs -SET remaining_steps = pgflow.runs.remaining_steps - 1 -FROM step_state -WHERE pgflow.runs.run_id = complete_task.run_id - AND step_state.status = 'completed'; - --- ========================================== --- POST-COMPLETION ACTIONS --- ========================================== - --- ---------- Get updated state for broadcasting ---------- -SELECT * INTO v_step_state FROM pgflow.step_states -WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; - --- ---------- Handle step completion ---------- -IF v_step_state.status = 'completed' THEN - -- Cascade complete any taskless steps that are now ready - PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); - - -- Broadcast step:completed event - PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'step:completed', - 'run_id', complete_task.run_id, - 'step_slug', complete_task.step_slug, - 'status', 'completed', - 'output', complete_task.output, - 'completed_at', v_step_state.completed_at - ), - concat('step:', complete_task.step_slug, ':completed'), - concat('pgflow:run:', complete_task.run_id), - false - ); -END IF; - --- ---------- Archive completed task message ---------- --- Move message from active queue to archive table -PERFORM ( - WITH completed_tasks AS ( - SELECT r.flow_slug, st.message_id - FROM pgflow.step_tasks st - JOIN pgflow.runs r ON st.run_id = r.run_id - WHERE st.run_id = complete_task.run_id - AND st.step_slug = complete_task.step_slug - AND st.task_index = complete_task.task_index - AND st.status = 'completed' - ) - SELECT pgmq.archive(ct.flow_slug, ct.message_id) - FROM completed_tasks ct - WHERE EXISTS (SELECT 1 FROM completed_tasks) -); - --- ---------- Trigger next steps ---------- --- Start any steps that are now ready (deps satisfied) -PERFORM pgflow.start_ready_steps(complete_task.run_id); - --- Check if the entire run is complete -PERFORM pgflow.maybe_complete_run(complete_task.run_id); - --- ---------- Return completed task ---------- -RETURN QUERY SELECT * -FROM pgflow.step_tasks AS step_task -WHERE step_task.run_id = complete_task.run_id - AND step_task.step_slug = complete_task.step_slug - AND step_task.task_index = complete_task.task_index; - -end; -$$; --- Modify "start_flow" function -CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_created_run pgflow.runs%ROWTYPE; - v_root_map_count int; -begin - --- ========================================== --- VALIDATION: Root map array input --- ========================================== -WITH root_maps AS ( - SELECT step_slug - FROM pgflow.steps - WHERE steps.flow_slug = start_flow.flow_slug - AND steps.step_type = 'map' - AND steps.deps_count = 0 -) -SELECT COUNT(*) INTO v_root_map_count FROM root_maps; - --- If we have root map steps, validate that input is an array -IF v_root_map_count > 0 THEN - -- First check for NULL (should be caught by NOT NULL constraint, but be defensive) - IF start_flow.input IS NULL THEN - RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug; - END IF; - - -- Then check if it's not an array - IF jsonb_typeof(start_flow.input) != 'array' THEN - RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)', - start_flow.flow_slug, jsonb_typeof(start_flow.input); - END IF; -END IF; - --- ========================================== --- MAIN CTE CHAIN: Create run and step states --- ========================================== -WITH - -- ---------- Gather flow metadata ---------- - flow_steps AS ( - SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count - FROM pgflow.steps - WHERE steps.flow_slug = start_flow.flow_slug - ), - -- ---------- Create run record ---------- - created_run AS ( - INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) - VALUES ( - COALESCE(start_flow.run_id, gen_random_uuid()), - start_flow.flow_slug, - start_flow.input, - (SELECT count(*) FROM flow_steps) - ) - RETURNING * - ), - -- ---------- Create step states ---------- - -- Sets initial_tasks: known for root maps, NULL for dependent maps - created_step_states AS ( - INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) - SELECT - fs.flow_slug, - (SELECT created_run.run_id FROM created_run), - fs.step_slug, - fs.deps_count, - -- Updated logic for initial_tasks: - CASE - WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN - -- Root map: get array length from input - CASE - WHEN jsonb_typeof(start_flow.input) = 'array' THEN - jsonb_array_length(start_flow.input) - ELSE - 1 - END - WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN - -- Dependent map: unknown until dependencies complete - NULL - ELSE - -- Single steps: always 1 task - 1 - END - FROM flow_steps fs - ) -SELECT * FROM created_run INTO v_created_run; - --- ========================================== --- POST-CREATION ACTIONS --- ========================================== - --- ---------- Broadcast run:started event ---------- -PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'run:started', - 'run_id', v_created_run.run_id, - 'flow_slug', v_created_run.flow_slug, - 'input', v_created_run.input, - 'status', 'started', - 'remaining_steps', v_created_run.remaining_steps, - 'started_at', v_created_run.started_at - ), - 'run:started', - concat('pgflow:run:', v_created_run.run_id), - false -); - --- ---------- Complete taskless steps ---------- --- Handle empty array maps that should auto-complete -PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); - --- ---------- Start initial steps ---------- --- Start root steps (those with no dependencies) -PERFORM pgflow.start_ready_steps(v_created_run.run_id); - -RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; - -end; -$$; diff --git a/pkgs/core/supabase/migrations/20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql b/pkgs/core/supabase/migrations/20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql deleted file mode 100644 index a6df0d422..000000000 --- a/pkgs/core/supabase/migrations/20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql +++ /dev/null @@ -1,157 +0,0 @@ --- Modify "start_tasks" function -CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ -with tasks as ( - select - task.flow_slug, - task.run_id, - task.step_slug, - task.task_index, - task.message_id - from pgflow.step_tasks as task - where task.flow_slug = start_tasks.flow_slug - and task.message_id = any(msg_ids) - and task.status = 'queued' - ), - start_tasks_update as ( - update pgflow.step_tasks - set - attempts_count = attempts_count + 1, - status = 'started', - started_at = now(), - last_worker_id = worker_id - from tasks - where step_tasks.message_id = tasks.message_id - and step_tasks.flow_slug = tasks.flow_slug - and step_tasks.status = 'queued' - ), - runs as ( - select - r.run_id, - r.input - from pgflow.runs r - where r.run_id in (select run_id from tasks) - ), - deps as ( - select - st.run_id, - st.step_slug, - dep.dep_slug, - dep_task.output as dep_output - from tasks st - join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug - join pgflow.step_tasks dep_task on - dep_task.run_id = st.run_id and - dep_task.step_slug = dep.dep_slug and - dep_task.status = 'completed' - ), - deps_outputs as ( - select - d.run_id, - d.step_slug, - jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, - count(*) as dep_count - from deps d - group by d.run_id, d.step_slug - ), - timeouts as ( - select - task.message_id, - task.flow_slug, - coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay - from tasks task - join pgflow.flows flow on flow.flow_slug = task.flow_slug - join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug - ), - -- Batch update visibility timeouts for all messages - set_vt_batch as ( - select pgflow.set_vt_batch( - start_tasks.flow_slug, - array_agg(t.message_id order by t.message_id), - array_agg(t.vt_delay order by t.message_id) - ) - from timeouts t - ) - select - st.flow_slug, - st.run_id, - st.step_slug, - -- ========================================== - -- INPUT CONSTRUCTION LOGIC - -- ========================================== - -- This nested CASE statement determines how to construct the input - -- for each task based on the step type (map vs non-map). - -- - -- The fundamental difference: - -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") - -- - Non-map steps: Receive structured objects with named keys - -- (e.g., {"run": {...}, "dependency1": {...}}) - -- ========================================== - CASE - -- -------------------- MAP STEPS -------------------- - -- Map steps process arrays element-by-element. - -- Each task receives ONE element from the array at its task_index position. - WHEN step.step_type = 'map' THEN - -- Map steps get raw array elements without any wrapper object - CASE - -- ROOT MAP: Gets array from run input - -- Example: run input = [1, 2, 3] - -- task 0 gets: 1 - -- task 1 gets: 2 - -- task 2 gets: 3 - WHEN step.deps_count = 0 THEN - -- Root map (deps_count = 0): no dependencies, reads from run input. - -- Extract the element at task_index from the run's input array. - -- Note: If run input is not an array, this will return NULL - -- and the flow will fail (validated in start_flow). - jsonb_array_element(r.input, st.task_index) - - -- DEPENDENT MAP: Gets array from its single dependency - -- Example: dependency output = ["a", "b", "c"] - -- task 0 gets: "a" - -- task 1 gets: "b" - -- task 2 gets: "c" - ELSE - -- Has dependencies (should be exactly 1 for map steps). - -- Extract the element at task_index from the dependency's output array. - -- - -- Why the subquery with jsonb_each? - -- - The dependency outputs a raw array: [1, 2, 3] - -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} - -- - We need to unwrap and get just the array value - -- - Map steps have exactly 1 dependency (enforced by add_step) - -- - So jsonb_each will return exactly 1 row - -- - We extract the 'value' which is the raw array [1, 2, 3] - -- - Then get the element at task_index from that array - (SELECT jsonb_array_element(value, st.task_index) - FROM jsonb_each(dep_out.deps_output) - LIMIT 1) - END - - -- -------------------- NON-MAP STEPS -------------------- - -- Regular (non-map) steps receive ALL inputs as a structured object. - -- This includes the original run input plus all dependency outputs. - ELSE - -- Non-map steps get structured input with named keys - -- Example output: { - -- "run": {"original": "input"}, - -- "step1": {"output": "from_step1"}, - -- "step2": {"output": "from_step2"} - -- } - -- - -- Build object with 'run' key containing original input - jsonb_build_object('run', r.input) || - -- Merge with deps_output which already has dependency outputs - -- deps_output format: {"dep1": output1, "dep2": output2, ...} - -- If no dependencies, defaults to empty object - coalesce(dep_out.deps_output, '{}'::jsonb) - END as input, - st.message_id as msg_id - from tasks st - join runs r on st.run_id = r.run_id - join pgflow.steps step on - step.flow_slug = st.flow_slug and - step.step_slug = st.step_slug - left join deps_outputs dep_out on - dep_out.run_id = st.run_id and - dep_out.step_slug = st.step_slug -$$; diff --git a/pkgs/core/supabase/migrations/20250918042753_pgflow_temp_handle_map_output_aggregation.sql b/pkgs/core/supabase/migrations/20250918042753_pgflow_temp_handle_map_output_aggregation.sql deleted file mode 100644 index 474e25115..000000000 --- a/pkgs/core/supabase/migrations/20250918042753_pgflow_temp_handle_map_output_aggregation.sql +++ /dev/null @@ -1,489 +0,0 @@ --- Modify "maybe_complete_run" function -CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_completed_run pgflow.runs%ROWTYPE; -begin - -- ========================================== - -- CHECK AND COMPLETE RUN IF FINISHED - -- ========================================== - -- ---------- Complete run if all steps done ---------- - UPDATE pgflow.runs - SET - status = 'completed', - completed_at = now(), - -- Only compute expensive aggregation when actually completing the run - output = ( - -- ---------- Gather outputs from leaf steps ---------- - -- Leaf steps = steps with no dependents - -- For map steps: aggregate all task outputs into array - -- For single steps: use the single task output - SELECT jsonb_object_agg( - step_slug, - CASE - WHEN step_type = 'map' THEN aggregated_output - ELSE single_output - END - ) - FROM ( - SELECT DISTINCT - leaf_state.step_slug, - leaf_step.step_type, - -- For map steps: aggregate all task outputs - CASE WHEN leaf_step.step_type = 'map' THEN - (SELECT COALESCE(jsonb_agg(leaf_task.output ORDER BY leaf_task.task_index), '[]'::jsonb) - FROM pgflow.step_tasks leaf_task - WHERE leaf_task.run_id = leaf_state.run_id - AND leaf_task.step_slug = leaf_state.step_slug - AND leaf_task.status = 'completed') - END as aggregated_output, - -- For single steps: get the single output - CASE WHEN leaf_step.step_type = 'single' THEN - (SELECT leaf_task.output - FROM pgflow.step_tasks leaf_task - WHERE leaf_task.run_id = leaf_state.run_id - AND leaf_task.step_slug = leaf_state.step_slug - AND leaf_task.status = 'completed' - LIMIT 1) - END as single_output - FROM pgflow.step_states leaf_state - JOIN pgflow.steps leaf_step ON leaf_step.flow_slug = leaf_state.flow_slug AND leaf_step.step_slug = leaf_state.step_slug - WHERE leaf_state.run_id = maybe_complete_run.run_id - AND leaf_state.status = 'completed' - AND NOT EXISTS ( - SELECT 1 - FROM pgflow.deps dep - WHERE dep.flow_slug = leaf_state.flow_slug - AND dep.dep_slug = leaf_state.step_slug - ) - ) leaf_outputs - ) - WHERE pgflow.runs.run_id = maybe_complete_run.run_id - AND pgflow.runs.remaining_steps = 0 - AND pgflow.runs.status != 'completed' - RETURNING * INTO v_completed_run; - - -- ========================================== - -- BROADCAST COMPLETION EVENT - -- ========================================== - IF v_completed_run.run_id IS NOT NULL THEN - PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'run:completed', - 'run_id', v_completed_run.run_id, - 'flow_slug', v_completed_run.flow_slug, - 'status', 'completed', - 'output', v_completed_run.output, - 'completed_at', v_completed_run.completed_at - ), - 'run:completed', - concat('pgflow:run:', v_completed_run.run_id), - false - ); - END IF; -end; -$$; --- Modify "complete_task" function -CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ -declare - v_step_state pgflow.step_states%ROWTYPE; - v_dependent_map_slug text; -begin - --- ========================================== --- VALIDATION: Array output for dependent maps --- ========================================== --- Must happen BEFORE acquiring locks to fail fast without holding resources --- Only validate for single steps - map steps produce scalars that get aggregated -SELECT child_step.step_slug INTO v_dependent_map_slug -FROM pgflow.deps dependency -JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug - AND child_step.step_slug = dependency.step_slug -JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug - AND parent_step.step_slug = dependency.dep_slug -JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug - AND child_state.step_slug = child_step.step_slug -WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step - AND dependency.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) - AND parent_step.step_type = 'single' -- Only validate single steps - AND child_step.step_type = 'map' - AND child_state.run_id = complete_task.run_id - AND child_state.initial_tasks IS NULL - AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') -LIMIT 1; - -IF v_dependent_map_slug IS NOT NULL THEN - RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)', - v_dependent_map_slug, - complete_task.step_slug, - CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END, - complete_task.output; -END IF; - --- ========================================== --- MAIN CTE CHAIN: Update task and propagate changes --- ========================================== -WITH --- ---------- Lock acquisition ---------- --- Acquire locks in consistent order (run -> step) to prevent deadlocks -run_lock AS ( - SELECT * FROM pgflow.runs - WHERE pgflow.runs.run_id = complete_task.run_id - FOR UPDATE -), -step_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - FOR UPDATE -), --- ---------- Task completion ---------- --- Update the task record with completion status and output -task AS ( - UPDATE pgflow.step_tasks - SET - status = 'completed', - completed_at = now(), - output = complete_task.output - WHERE pgflow.step_tasks.run_id = complete_task.run_id - AND pgflow.step_tasks.step_slug = complete_task.step_slug - AND pgflow.step_tasks.task_index = complete_task.task_index - AND pgflow.step_tasks.status = 'started' - RETURNING * -), --- ---------- Step state update ---------- --- Decrement remaining_tasks and potentially mark step as completed -step_state AS ( - UPDATE pgflow.step_states - SET - status = CASE - WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement - ELSE 'started' - END, - completed_at = CASE - WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement - ELSE NULL - END, - remaining_tasks = pgflow.step_states.remaining_tasks - 1 - FROM task - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug = complete_task.step_slug - RETURNING pgflow.step_states.* -), --- ---------- Dependency resolution ---------- --- Find all child steps that depend on the completed parent step (only if parent completed) -child_steps AS ( - SELECT deps.step_slug AS child_step_slug - FROM pgflow.deps deps - JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug - WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child - ORDER BY deps.step_slug -- Ensure consistent ordering -), --- ---------- Lock child steps ---------- --- Acquire locks on all child steps before updating them -child_steps_lock AS ( - SELECT * FROM pgflow.step_states - WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps) - FOR UPDATE -), --- ---------- Update child steps ---------- --- Decrement remaining_deps and resolve NULL initial_tasks for map steps -child_steps_update AS ( - UPDATE pgflow.step_states child_state - SET remaining_deps = child_state.remaining_deps - 1, - -- Resolve NULL initial_tasks for child map steps - -- This is where child maps learn their array size from the parent - -- This CTE only runs when the parent step is complete (see child_steps JOIN) - initial_tasks = CASE - WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN - CASE - WHEN parent_step.step_type = 'map' THEN - -- Map->map: Count all completed tasks from parent map - -- We add 1 because the current task is being completed in this transaction - -- but isn't yet visible as 'completed' in the step_tasks table - -- TODO: Refactor to use future column step_states.total_tasks - -- Would eliminate the COUNT query and just use parent_state.total_tasks - (SELECT COUNT(*)::int + 1 - FROM pgflow.step_tasks parent_tasks - WHERE parent_tasks.run_id = complete_task.run_id - AND parent_tasks.step_slug = complete_task.step_slug - AND parent_tasks.status = 'completed' - AND parent_tasks.task_index != complete_task.task_index) - ELSE - -- Single->map: Use output array length (single steps complete immediately) - CASE - WHEN complete_task.output IS NOT NULL - AND jsonb_typeof(complete_task.output) = 'array' THEN - jsonb_array_length(complete_task.output) - ELSE NULL -- Keep NULL if not an array - END - END - ELSE child_state.initial_tasks -- Keep existing value (including NULL) - END - FROM child_steps children - JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) - AND child_step.step_slug = children.child_step_slug - JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) - AND parent_step.step_slug = complete_task.step_slug - WHERE child_state.run_id = complete_task.run_id - AND child_state.step_slug = children.child_step_slug -) --- ---------- Update run remaining_steps ---------- --- Decrement the run's remaining_steps counter if step completed -UPDATE pgflow.runs -SET remaining_steps = pgflow.runs.remaining_steps - 1 -FROM step_state -WHERE pgflow.runs.run_id = complete_task.run_id - AND step_state.status = 'completed'; - --- ========================================== --- POST-COMPLETION ACTIONS --- ========================================== - --- ---------- Get updated state for broadcasting ---------- -SELECT * INTO v_step_state FROM pgflow.step_states -WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; - --- ---------- Handle step completion ---------- -IF v_step_state.status = 'completed' THEN - -- Cascade complete any taskless steps that are now ready - PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); - - -- Broadcast step:completed event - -- For map steps, aggregate all task outputs; for single steps, use the task output - PERFORM realtime.send( - jsonb_build_object( - 'event_type', 'step:completed', - 'run_id', complete_task.run_id, - 'step_slug', complete_task.step_slug, - 'status', 'completed', - 'output', CASE - WHEN (SELECT s.step_type FROM pgflow.steps s - WHERE s.flow_slug = v_step_state.flow_slug - AND s.step_slug = complete_task.step_slug) = 'map' THEN - -- Aggregate all task outputs for map steps - (SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb) - FROM pgflow.step_tasks st - WHERE st.run_id = complete_task.run_id - AND st.step_slug = complete_task.step_slug - AND st.status = 'completed') - ELSE - -- Single step: use the individual task output - complete_task.output - END, - 'completed_at', v_step_state.completed_at - ), - concat('step:', complete_task.step_slug, ':completed'), - concat('pgflow:run:', complete_task.run_id), - false - ); -END IF; - --- ---------- Archive completed task message ---------- --- Move message from active queue to archive table -PERFORM ( - WITH completed_tasks AS ( - SELECT r.flow_slug, st.message_id - FROM pgflow.step_tasks st - JOIN pgflow.runs r ON st.run_id = r.run_id - WHERE st.run_id = complete_task.run_id - AND st.step_slug = complete_task.step_slug - AND st.task_index = complete_task.task_index - AND st.status = 'completed' - ) - SELECT pgmq.archive(ct.flow_slug, ct.message_id) - FROM completed_tasks ct - WHERE EXISTS (SELECT 1 FROM completed_tasks) -); - --- ---------- Trigger next steps ---------- --- Start any steps that are now ready (deps satisfied) -PERFORM pgflow.start_ready_steps(complete_task.run_id); - --- Check if the entire run is complete -PERFORM pgflow.maybe_complete_run(complete_task.run_id); - --- ---------- Return completed task ---------- -RETURN QUERY SELECT * -FROM pgflow.step_tasks AS step_task -WHERE step_task.run_id = complete_task.run_id - AND step_task.step_slug = complete_task.step_slug - AND step_task.task_index = complete_task.task_index; - -end; -$$; --- Modify "start_tasks" function -CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ -with tasks as ( - select - task.flow_slug, - task.run_id, - task.step_slug, - task.task_index, - task.message_id - from pgflow.step_tasks as task - join pgflow.runs r on r.run_id = task.run_id - where task.flow_slug = start_tasks.flow_slug - and task.message_id = any(msg_ids) - and task.status = 'queued' - -- MVP: Don't start tasks on failed runs - and r.status != 'failed' - ), - start_tasks_update as ( - update pgflow.step_tasks - set - attempts_count = attempts_count + 1, - status = 'started', - started_at = now(), - last_worker_id = worker_id - from tasks - where step_tasks.message_id = tasks.message_id - and step_tasks.flow_slug = tasks.flow_slug - and step_tasks.status = 'queued' - ), - runs as ( - select - r.run_id, - r.input - from pgflow.runs r - where r.run_id in (select run_id from tasks) - ), - deps as ( - select - st.run_id, - st.step_slug, - dep.dep_slug, - -- Aggregate map outputs or use single output - CASE - WHEN dep_step.step_type = 'map' THEN - -- Aggregate all task outputs ordered by task_index - -- Use COALESCE to return empty array if no tasks - (SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb) - FROM pgflow.step_tasks dt - WHERE dt.run_id = st.run_id - AND dt.step_slug = dep.dep_slug - AND dt.status = 'completed') - ELSE - -- Single step: use the single task output - dep_task.output - END as dep_output - from tasks st - join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug - join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug - left join pgflow.step_tasks dep_task on - dep_task.run_id = st.run_id and - dep_task.step_slug = dep.dep_slug and - dep_task.status = 'completed' - and dep_step.step_type = 'single' -- Only join for single steps - ), - deps_outputs as ( - select - d.run_id, - d.step_slug, - jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, - count(*) as dep_count - from deps d - group by d.run_id, d.step_slug - ), - timeouts as ( - select - task.message_id, - task.flow_slug, - coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay - from tasks task - join pgflow.flows flow on flow.flow_slug = task.flow_slug - join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug - ), - -- Batch update visibility timeouts for all messages - set_vt_batch as ( - select pgflow.set_vt_batch( - start_tasks.flow_slug, - array_agg(t.message_id order by t.message_id), - array_agg(t.vt_delay order by t.message_id) - ) - from timeouts t - ) - select - st.flow_slug, - st.run_id, - st.step_slug, - -- ========================================== - -- INPUT CONSTRUCTION LOGIC - -- ========================================== - -- This nested CASE statement determines how to construct the input - -- for each task based on the step type (map vs non-map). - -- - -- The fundamental difference: - -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") - -- - Non-map steps: Receive structured objects with named keys - -- (e.g., {"run": {...}, "dependency1": {...}}) - -- ========================================== - CASE - -- -------------------- MAP STEPS -------------------- - -- Map steps process arrays element-by-element. - -- Each task receives ONE element from the array at its task_index position. - WHEN step.step_type = 'map' THEN - -- Map steps get raw array elements without any wrapper object - CASE - -- ROOT MAP: Gets array from run input - -- Example: run input = [1, 2, 3] - -- task 0 gets: 1 - -- task 1 gets: 2 - -- task 2 gets: 3 - WHEN step.deps_count = 0 THEN - -- Root map (deps_count = 0): no dependencies, reads from run input. - -- Extract the element at task_index from the run's input array. - -- Note: If run input is not an array, this will return NULL - -- and the flow will fail (validated in start_flow). - jsonb_array_element(r.input, st.task_index) - - -- DEPENDENT MAP: Gets array from its single dependency - -- Example: dependency output = ["a", "b", "c"] - -- task 0 gets: "a" - -- task 1 gets: "b" - -- task 2 gets: "c" - ELSE - -- Has dependencies (should be exactly 1 for map steps). - -- Extract the element at task_index from the dependency's output array. - -- - -- Why the subquery with jsonb_each? - -- - The dependency outputs a raw array: [1, 2, 3] - -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} - -- - We need to unwrap and get just the array value - -- - Map steps have exactly 1 dependency (enforced by add_step) - -- - So jsonb_each will return exactly 1 row - -- - We extract the 'value' which is the raw array [1, 2, 3] - -- - Then get the element at task_index from that array - (SELECT jsonb_array_element(value, st.task_index) - FROM jsonb_each(dep_out.deps_output) - LIMIT 1) - END - - -- -------------------- NON-MAP STEPS -------------------- - -- Regular (non-map) steps receive ALL inputs as a structured object. - -- This includes the original run input plus all dependency outputs. - ELSE - -- Non-map steps get structured input with named keys - -- Example output: { - -- "run": {"original": "input"}, - -- "step1": {"output": "from_step1"}, - -- "step2": {"output": "from_step2"} - -- } - -- - -- Build object with 'run' key containing original input - jsonb_build_object('run', r.input) || - -- Merge with deps_output which already has dependency outputs - -- deps_output format: {"dep1": output1, "dep2": output2, ...} - -- If no dependencies, defaults to empty object - coalesce(dep_out.deps_output, '{}'::jsonb) - END as input, - st.message_id as msg_id - from tasks st - join runs r on st.run_id = r.run_id - join pgflow.steps step on - step.flow_slug = st.flow_slug and - step.step_slug = st.step_slug - left join deps_outputs dep_out on - dep_out.run_id = st.run_id and - dep_out.step_slug = st.step_slug -$$; diff --git a/pkgs/core/supabase/migrations/20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql b/pkgs/core/supabase/migrations/20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql deleted file mode 100644 index 128cede04..000000000 --- a/pkgs/core/supabase/migrations/20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql +++ /dev/null @@ -1,178 +0,0 @@ --- Modify "step_task_record" composite type -ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "task_index" integer; --- Modify "start_tasks" function -CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ -with tasks as ( - select - task.flow_slug, - task.run_id, - task.step_slug, - task.task_index, - task.message_id - from pgflow.step_tasks as task - join pgflow.runs r on r.run_id = task.run_id - where task.flow_slug = start_tasks.flow_slug - and task.message_id = any(msg_ids) - and task.status = 'queued' - -- MVP: Don't start tasks on failed runs - and r.status != 'failed' - ), - start_tasks_update as ( - update pgflow.step_tasks - set - attempts_count = attempts_count + 1, - status = 'started', - started_at = now(), - last_worker_id = worker_id - from tasks - where step_tasks.message_id = tasks.message_id - and step_tasks.flow_slug = tasks.flow_slug - and step_tasks.status = 'queued' - ), - runs as ( - select - r.run_id, - r.input - from pgflow.runs r - where r.run_id in (select run_id from tasks) - ), - deps as ( - select - st.run_id, - st.step_slug, - dep.dep_slug, - -- Aggregate map outputs or use single output - CASE - WHEN dep_step.step_type = 'map' THEN - -- Aggregate all task outputs ordered by task_index - -- Use COALESCE to return empty array if no tasks - (SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb) - FROM pgflow.step_tasks dt - WHERE dt.run_id = st.run_id - AND dt.step_slug = dep.dep_slug - AND dt.status = 'completed') - ELSE - -- Single step: use the single task output - dep_task.output - END as dep_output - from tasks st - join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug - join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug - left join pgflow.step_tasks dep_task on - dep_task.run_id = st.run_id and - dep_task.step_slug = dep.dep_slug and - dep_task.status = 'completed' - and dep_step.step_type = 'single' -- Only join for single steps - ), - deps_outputs as ( - select - d.run_id, - d.step_slug, - jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, - count(*) as dep_count - from deps d - group by d.run_id, d.step_slug - ), - timeouts as ( - select - task.message_id, - task.flow_slug, - coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay - from tasks task - join pgflow.flows flow on flow.flow_slug = task.flow_slug - join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug - ), - -- Batch update visibility timeouts for all messages - set_vt_batch as ( - select pgflow.set_vt_batch( - start_tasks.flow_slug, - array_agg(t.message_id order by t.message_id), - array_agg(t.vt_delay order by t.message_id) - ) - from timeouts t - ) - select - st.flow_slug, - st.run_id, - st.step_slug, - -- ========================================== - -- INPUT CONSTRUCTION LOGIC - -- ========================================== - -- This nested CASE statement determines how to construct the input - -- for each task based on the step type (map vs non-map). - -- - -- The fundamental difference: - -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") - -- - Non-map steps: Receive structured objects with named keys - -- (e.g., {"run": {...}, "dependency1": {...}}) - -- ========================================== - CASE - -- -------------------- MAP STEPS -------------------- - -- Map steps process arrays element-by-element. - -- Each task receives ONE element from the array at its task_index position. - WHEN step.step_type = 'map' THEN - -- Map steps get raw array elements without any wrapper object - CASE - -- ROOT MAP: Gets array from run input - -- Example: run input = [1, 2, 3] - -- task 0 gets: 1 - -- task 1 gets: 2 - -- task 2 gets: 3 - WHEN step.deps_count = 0 THEN - -- Root map (deps_count = 0): no dependencies, reads from run input. - -- Extract the element at task_index from the run's input array. - -- Note: If run input is not an array, this will return NULL - -- and the flow will fail (validated in start_flow). - jsonb_array_element(r.input, st.task_index) - - -- DEPENDENT MAP: Gets array from its single dependency - -- Example: dependency output = ["a", "b", "c"] - -- task 0 gets: "a" - -- task 1 gets: "b" - -- task 2 gets: "c" - ELSE - -- Has dependencies (should be exactly 1 for map steps). - -- Extract the element at task_index from the dependency's output array. - -- - -- Why the subquery with jsonb_each? - -- - The dependency outputs a raw array: [1, 2, 3] - -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} - -- - We need to unwrap and get just the array value - -- - Map steps have exactly 1 dependency (enforced by add_step) - -- - So jsonb_each will return exactly 1 row - -- - We extract the 'value' which is the raw array [1, 2, 3] - -- - Then get the element at task_index from that array - (SELECT jsonb_array_element(value, st.task_index) - FROM jsonb_each(dep_out.deps_output) - LIMIT 1) - END - - -- -------------------- NON-MAP STEPS -------------------- - -- Regular (non-map) steps receive ALL inputs as a structured object. - -- This includes the original run input plus all dependency outputs. - ELSE - -- Non-map steps get structured input with named keys - -- Example output: { - -- "run": {"original": "input"}, - -- "step1": {"output": "from_step1"}, - -- "step2": {"output": "from_step2"} - -- } - -- - -- Build object with 'run' key containing original input - jsonb_build_object('run', r.input) || - -- Merge with deps_output which already has dependency outputs - -- deps_output format: {"dep1": output1, "dep2": output2, ...} - -- If no dependencies, defaults to empty object - coalesce(dep_out.deps_output, '{}'::jsonb) - END as input, - st.message_id as msg_id, - st.task_index as task_index - from tasks st - join runs r on st.run_id = r.run_id - join pgflow.steps step on - step.flow_slug = st.flow_slug and - step.step_slug = st.step_slug - left join deps_outputs dep_out on - dep_out.run_id = st.run_id and - dep_out.step_slug = st.step_slug -$$; diff --git a/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql b/pkgs/core/supabase/migrations/20251006073122_pgflow_add_map_step_type.sql similarity index 54% rename from pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql rename to pkgs/core/supabase/migrations/20251006073122_pgflow_add_map_step_type.sql index 5675f4df1..6afdd4dd7 100644 --- a/pkgs/core/supabase/migrations/20250919101802_pgflow_temp_orphaned_messages_index.sql +++ b/pkgs/core/supabase/migrations/20251006073122_pgflow_add_map_step_type.sql @@ -1,5 +1,95 @@ +-- Modify "step_task_record" composite type +ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "task_index" integer; +-- Modify "step_states" table +ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "step_states_remaining_tasks_check", ADD CONSTRAINT "initial_tasks_known_when_started" CHECK ((status <> 'started'::text) OR (initial_tasks IS NOT NULL)), ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> 'created'::text)), ADD CONSTRAINT "step_states_initial_tasks_check" CHECK ((initial_tasks IS NULL) OR (initial_tasks >= 0)), ALTER COLUMN "remaining_tasks" DROP NOT NULL, ALTER COLUMN "remaining_tasks" DROP DEFAULT, ADD COLUMN "initial_tasks" integer NULL; -- Modify "step_tasks" table -ALTER TABLE "pgflow"."step_tasks" DROP CONSTRAINT "output_valid_only_for_completed", ADD CONSTRAINT "output_valid_only_for_completed" CHECK ((output IS NULL) OR (status = ANY (ARRAY['completed'::text, 'failed'::text]))); +ALTER TABLE "pgflow"."step_tasks" DROP CONSTRAINT "only_single_task_per_step", DROP CONSTRAINT "output_valid_only_for_completed", ADD CONSTRAINT "output_valid_only_for_completed" CHECK ((output IS NULL) OR (status = ANY (ARRAY['completed'::text, 'failed'::text]))); +-- Modify "steps" table +ALTER TABLE "pgflow"."steps" DROP CONSTRAINT "steps_step_type_check", ADD CONSTRAINT "steps_step_type_check" CHECK (step_type = ANY (ARRAY['single'::text, 'map'::text])); +-- Modify "maybe_complete_run" function +CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_completed_run pgflow.runs%ROWTYPE; +begin + -- ========================================== + -- CHECK AND COMPLETE RUN IF FINISHED + -- ========================================== + -- ---------- Complete run if all steps done ---------- + UPDATE pgflow.runs + SET + status = 'completed', + completed_at = now(), + -- Only compute expensive aggregation when actually completing the run + output = ( + -- ---------- Gather outputs from leaf steps ---------- + -- Leaf steps = steps with no dependents + -- For map steps: aggregate all task outputs into array + -- For single steps: use the single task output + SELECT jsonb_object_agg( + step_slug, + CASE + WHEN step_type = 'map' THEN aggregated_output + ELSE single_output + END + ) + FROM ( + SELECT DISTINCT + leaf_state.step_slug, + leaf_step.step_type, + -- For map steps: aggregate all task outputs + CASE WHEN leaf_step.step_type = 'map' THEN + (SELECT COALESCE(jsonb_agg(leaf_task.output ORDER BY leaf_task.task_index), '[]'::jsonb) + FROM pgflow.step_tasks leaf_task + WHERE leaf_task.run_id = leaf_state.run_id + AND leaf_task.step_slug = leaf_state.step_slug + AND leaf_task.status = 'completed') + END as aggregated_output, + -- For single steps: get the single output + CASE WHEN leaf_step.step_type = 'single' THEN + (SELECT leaf_task.output + FROM pgflow.step_tasks leaf_task + WHERE leaf_task.run_id = leaf_state.run_id + AND leaf_task.step_slug = leaf_state.step_slug + AND leaf_task.status = 'completed' + LIMIT 1) + END as single_output + FROM pgflow.step_states leaf_state + JOIN pgflow.steps leaf_step ON leaf_step.flow_slug = leaf_state.flow_slug AND leaf_step.step_slug = leaf_state.step_slug + WHERE leaf_state.run_id = maybe_complete_run.run_id + AND leaf_state.status = 'completed' + AND NOT EXISTS ( + SELECT 1 + FROM pgflow.deps dep + WHERE dep.flow_slug = leaf_state.flow_slug + AND dep.dep_slug = leaf_state.step_slug + ) + ) leaf_outputs + ) + WHERE pgflow.runs.run_id = maybe_complete_run.run_id + AND pgflow.runs.remaining_steps = 0 + AND pgflow.runs.status != 'completed' + RETURNING * INTO v_completed_run; + + -- ========================================== + -- BROADCAST COMPLETION EVENT + -- ========================================== + IF v_completed_run.run_id IS NOT NULL THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:completed', + 'run_id', v_completed_run.run_id, + 'flow_slug', v_completed_run.flow_slug, + 'status', 'completed', + 'output', v_completed_run.output, + 'completed_at', v_completed_run.completed_at + ), + 'run:completed', + concat('pgflow:run:', v_completed_run.run_id), + false + ); + END IF; +end; +$$; -- Modify "start_ready_steps" function CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ begin @@ -174,6 +264,96 @@ FROM sent_messages; end; $$; +-- Create "cascade_complete_taskless_steps" function +CREATE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_total_completed int := 0; + v_iteration_completed int; + v_iterations int := 0; + v_max_iterations int := 50; +BEGIN + -- ========================================== + -- ITERATIVE CASCADE COMPLETION + -- ========================================== + -- Completes taskless steps in waves until none remain + LOOP + -- ---------- Safety check ---------- + v_iterations := v_iterations + 1; + IF v_iterations > v_max_iterations THEN + RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; + END IF; + + -- ========================================== + -- COMPLETE READY TASKLESS STEPS + -- ========================================== + WITH completed AS ( + -- ---------- Complete taskless steps ---------- + -- Steps with initial_tasks=0 and no remaining deps + UPDATE pgflow.step_states ss + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = s.flow_slug + AND ss.step_slug = s.step_slug + AND ss.status = 'created' + AND ss.remaining_deps = 0 + AND ss.initial_tasks = 0 + -- Process in topological order to ensure proper cascade + RETURNING ss.* + ), + -- ---------- Update dependent steps ---------- + -- Propagate completion and empty arrays to dependents + dep_updates AS ( + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - dep_count.count, + -- If the dependent is a map step and its dependency completed with 0 tasks, + -- set its initial_tasks to 0 as well + initial_tasks = CASE + WHEN s.step_type = 'map' AND dep_count.has_zero_tasks + THEN 0 -- Empty array propagation + ELSE ss.initial_tasks -- Keep existing value (including NULL) + END + FROM ( + -- Aggregate dependency updates per dependent step + SELECT + d.flow_slug, + d.step_slug as dependent_slug, + COUNT(*) as count, + BOOL_OR(c.initial_tasks = 0) as has_zero_tasks + FROM completed c + JOIN pgflow.deps d ON d.flow_slug = c.flow_slug + AND d.dep_slug = c.step_slug + GROUP BY d.flow_slug, d.step_slug + ) dep_count, + pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = dep_count.flow_slug + AND ss.step_slug = dep_count.dependent_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug + ), + -- ---------- Update run counters ---------- + -- Only decrement remaining_steps; let maybe_complete_run handle finalization + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - c.completed_count + FROM (SELECT COUNT(*) AS completed_count FROM completed) c + WHERE r.run_id = cascade_complete_taskless_steps.run_id + AND c.completed_count > 0 + ) + -- ---------- Check iteration results ---------- + SELECT COUNT(*) INTO v_iteration_completed FROM completed; + + EXIT WHEN v_iteration_completed = 0; -- No more steps to complete + v_total_completed := v_total_completed + v_iteration_completed; + END LOOP; + + RETURN v_total_completed; +END; +$$; -- Modify "complete_task" function CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ declare @@ -686,3 +866,355 @@ where st.run_id = fail_task.run_id end; $$; +-- Modify "start_flow" function +CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_created_run pgflow.runs%ROWTYPE; + v_root_map_count int; +begin + +-- ========================================== +-- VALIDATION: Root map array input +-- ========================================== +WITH root_maps AS ( + SELECT step_slug + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + AND steps.step_type = 'map' + AND steps.deps_count = 0 +) +SELECT COUNT(*) INTO v_root_map_count FROM root_maps; + +-- If we have root map steps, validate that input is an array +IF v_root_map_count > 0 THEN + -- First check for NULL (should be caught by NOT NULL constraint, but be defensive) + IF start_flow.input IS NULL THEN + RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug; + END IF; + + -- Then check if it's not an array + IF jsonb_typeof(start_flow.input) != 'array' THEN + RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)', + start_flow.flow_slug, jsonb_typeof(start_flow.input); + END IF; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Create run and step states +-- ========================================== +WITH + -- ---------- Gather flow metadata ---------- + flow_steps AS ( + SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + ), + -- ---------- Create run record ---------- + created_run AS ( + INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) + VALUES ( + COALESCE(start_flow.run_id, gen_random_uuid()), + start_flow.flow_slug, + start_flow.input, + (SELECT count(*) FROM flow_steps) + ) + RETURNING * + ), + -- ---------- Create step states ---------- + -- Sets initial_tasks: known for root maps, NULL for dependent maps + created_step_states AS ( + INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) + SELECT + fs.flow_slug, + (SELECT created_run.run_id FROM created_run), + fs.step_slug, + fs.deps_count, + -- Updated logic for initial_tasks: + CASE + WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN + -- Root map: get array length from input + CASE + WHEN jsonb_typeof(start_flow.input) = 'array' THEN + jsonb_array_length(start_flow.input) + ELSE + 1 + END + WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN + -- Dependent map: unknown until dependencies complete + NULL + ELSE + -- Single steps: always 1 task + 1 + END + FROM flow_steps fs + ) +SELECT * FROM created_run INTO v_created_run; + +-- ========================================== +-- POST-CREATION ACTIONS +-- ========================================== + +-- ---------- Broadcast run:started event ---------- +PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:started', + 'run_id', v_created_run.run_id, + 'flow_slug', v_created_run.flow_slug, + 'input', v_created_run.input, + 'status', 'started', + 'remaining_steps', v_created_run.remaining_steps, + 'started_at', v_created_run.started_at + ), + 'run:started', + concat('pgflow:run:', v_created_run.run_id), + false +); + +-- ---------- Complete taskless steps ---------- +-- Handle empty array maps that should auto-complete +PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); + +-- ---------- Start initial steps ---------- +-- Start root steps (those with no dependencies) +PERFORM pgflow.start_ready_steps(v_created_run.run_id); + +-- ---------- Check for run completion ---------- +-- If cascade completed all steps (zero-task flows), finalize the run +PERFORM pgflow.maybe_complete_run(v_created_run.run_id); + +RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; + +end; +$$; +-- Modify "start_tasks" function +CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ +with tasks as ( + select + task.flow_slug, + task.run_id, + task.step_slug, + task.task_index, + task.message_id + from pgflow.step_tasks as task + join pgflow.runs r on r.run_id = task.run_id + where task.flow_slug = start_tasks.flow_slug + and task.message_id = any(msg_ids) + and task.status = 'queued' + -- MVP: Don't start tasks on failed runs + and r.status != 'failed' + ), + start_tasks_update as ( + update pgflow.step_tasks + set + attempts_count = attempts_count + 1, + status = 'started', + started_at = now(), + last_worker_id = worker_id + from tasks + where step_tasks.message_id = tasks.message_id + and step_tasks.flow_slug = tasks.flow_slug + and step_tasks.status = 'queued' + ), + runs as ( + select + r.run_id, + r.input + from pgflow.runs r + where r.run_id in (select run_id from tasks) + ), + deps as ( + select + st.run_id, + st.step_slug, + dep.dep_slug, + -- Aggregate map outputs or use single output + CASE + WHEN dep_step.step_type = 'map' THEN + -- Aggregate all task outputs ordered by task_index + -- Use COALESCE to return empty array if no tasks + (SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb) + FROM pgflow.step_tasks dt + WHERE dt.run_id = st.run_id + AND dt.step_slug = dep.dep_slug + AND dt.status = 'completed') + ELSE + -- Single step: use the single task output + dep_task.output + END as dep_output + from tasks st + join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug + join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug + left join pgflow.step_tasks dep_task on + dep_task.run_id = st.run_id and + dep_task.step_slug = dep.dep_slug and + dep_task.status = 'completed' + and dep_step.step_type = 'single' -- Only join for single steps + ), + deps_outputs as ( + select + d.run_id, + d.step_slug, + jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, + count(*) as dep_count + from deps d + group by d.run_id, d.step_slug + ), + timeouts as ( + select + task.message_id, + task.flow_slug, + coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay + from tasks task + join pgflow.flows flow on flow.flow_slug = task.flow_slug + join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug + ), + -- Batch update visibility timeouts for all messages + set_vt_batch as ( + select pgflow.set_vt_batch( + start_tasks.flow_slug, + array_agg(t.message_id order by t.message_id), + array_agg(t.vt_delay order by t.message_id) + ) + from timeouts t + ) + select + st.flow_slug, + st.run_id, + st.step_slug, + -- ========================================== + -- INPUT CONSTRUCTION LOGIC + -- ========================================== + -- This nested CASE statement determines how to construct the input + -- for each task based on the step type (map vs non-map). + -- + -- The fundamental difference: + -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") + -- - Non-map steps: Receive structured objects with named keys + -- (e.g., {"run": {...}, "dependency1": {...}}) + -- ========================================== + CASE + -- -------------------- MAP STEPS -------------------- + -- Map steps process arrays element-by-element. + -- Each task receives ONE element from the array at its task_index position. + WHEN step.step_type = 'map' THEN + -- Map steps get raw array elements without any wrapper object + CASE + -- ROOT MAP: Gets array from run input + -- Example: run input = [1, 2, 3] + -- task 0 gets: 1 + -- task 1 gets: 2 + -- task 2 gets: 3 + WHEN step.deps_count = 0 THEN + -- Root map (deps_count = 0): no dependencies, reads from run input. + -- Extract the element at task_index from the run's input array. + -- Note: If run input is not an array, this will return NULL + -- and the flow will fail (validated in start_flow). + jsonb_array_element(r.input, st.task_index) + + -- DEPENDENT MAP: Gets array from its single dependency + -- Example: dependency output = ["a", "b", "c"] + -- task 0 gets: "a" + -- task 1 gets: "b" + -- task 2 gets: "c" + ELSE + -- Has dependencies (should be exactly 1 for map steps). + -- Extract the element at task_index from the dependency's output array. + -- + -- Why the subquery with jsonb_each? + -- - The dependency outputs a raw array: [1, 2, 3] + -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} + -- - We need to unwrap and get just the array value + -- - Map steps have exactly 1 dependency (enforced by add_step) + -- - So jsonb_each will return exactly 1 row + -- - We extract the 'value' which is the raw array [1, 2, 3] + -- - Then get the element at task_index from that array + (SELECT jsonb_array_element(value, st.task_index) + FROM jsonb_each(dep_out.deps_output) + LIMIT 1) + END + + -- -------------------- NON-MAP STEPS -------------------- + -- Regular (non-map) steps receive ALL inputs as a structured object. + -- This includes the original run input plus all dependency outputs. + ELSE + -- Non-map steps get structured input with named keys + -- Example output: { + -- "run": {"original": "input"}, + -- "step1": {"output": "from_step1"}, + -- "step2": {"output": "from_step2"} + -- } + -- + -- Build object with 'run' key containing original input + jsonb_build_object('run', r.input) || + -- Merge with deps_output which already has dependency outputs + -- deps_output format: {"dep1": output1, "dep2": output2, ...} + -- If no dependencies, defaults to empty object + coalesce(dep_out.deps_output, '{}'::jsonb) + END as input, + st.message_id as msg_id, + st.task_index as task_index + from tasks st + join runs r on st.run_id = r.run_id + join pgflow.steps step on + step.flow_slug = st.flow_slug and + step.step_slug = st.step_slug + left join deps_outputs dep_out on + dep_out.run_id = st.run_id and + dep_out.step_slug = st.step_slug +$$; +-- Create "add_step" function +CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + result_step pgflow.steps; + next_idx int; +BEGIN + -- Validate map step constraints + -- Map steps can have either: + -- 0 dependencies (root map - maps over flow input array) + -- 1 dependency (dependent map - maps over dependency output array) + IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN + RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', + add_step.step_slug, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + array_to_string(add_step.deps_slugs, ', '); + END IF; + + -- Get next step index + SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx + FROM pgflow.steps s + WHERE s.flow_slug = add_step.flow_slug; + + -- Create the step + INSERT INTO pgflow.steps ( + flow_slug, step_slug, step_type, step_index, deps_count, + opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay + ) + VALUES ( + add_step.flow_slug, + add_step.step_slug, + COALESCE(add_step.step_type, 'single'), + next_idx, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + add_step.max_attempts, + add_step.base_delay, + add_step.timeout, + add_step.start_delay + ) + ON CONFLICT ON CONSTRAINT steps_pkey + DO UPDATE SET step_slug = EXCLUDED.step_slug + RETURNING * INTO result_step; + + -- Insert dependencies + INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) + SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug + FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) + WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 + ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; + + RETURN result_step; +END; +$$; +-- Drop "add_step" function +DROP FUNCTION "pgflow"."add_step" (text, text, integer, integer, integer, integer); +-- Drop "add_step" function +DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer); diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index ea50bbf85..5344b0d85 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:46a22RkBGrdfb3veJG3ZlyUkS3us2qfEFGn5cjh2W+Q= +h1:A6hL4r68jggxXAl+YKBn+dGczQ3JWLsXvIh1QqnF4hU= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= @@ -8,12 +8,4 @@ h1:46a22RkBGrdfb3veJG3ZlyUkS3us2qfEFGn5cjh2W+Q= 20250627090700_pgflow_fix_function_search_paths.sql h1:NRMbmDKkOww7pOx1TVERMP5UdjmgfH0wE9QhzfBU3co= 20250707210212_pgflow_add_opt_start_delay.sql h1:11J7SDgS6EVFUwxSi0bRZnNQgVGTV0EJGj9yuC0vczY= 20250719205006_pgflow_worker_deprecation.sql h1:L3LDsVrUeABlRBXhHsu60bilfgDKEJHci5xWknH9XIg= -20250912075001_pgflow_temp_pr1_schema.sql h1:zVvGuRX/m8uPFCuJ7iAqOQ71onkCtze6P9d9ZsOgs98= -20250912080800_pgflow_temp_pr2_root_maps.sql h1:v2KdChKBPBOIq3nCVVtKWy1OVcIROV+tPtaTUPQujSo= -20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto= -20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc= -20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y= -20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4= -20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60= -20250919101802_pgflow_temp_orphaned_messages_index.sql h1:GyfPfQz4AqB1/sTAC7B/m6j8FJrpkocinnzerNfM0f8= -20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql h1:DguPK41IfsMykzodXqZq0BmW1IXZW8ZTj6rkw4LaHFE= +20251006073122_pgflow_add_map_step_type.sql h1:TOAhJ9WcG4mb67nBX+7DRl5UchezNM4CgTilS0wjM94= diff --git a/pkgs/core/supabase/tests/start_flow/empty_array_cascade.test.sql b/pkgs/core/supabase/tests/start_flow/empty_array_cascade.test.sql new file mode 100644 index 000000000..02841940b --- /dev/null +++ b/pkgs/core/supabase/tests/start_flow/empty_array_cascade.test.sql @@ -0,0 +1,47 @@ +-- Test: Empty array cascade should complete run with proper output and event +-- Reproduces issue where taskless cascades don't trigger run completion logic + +BEGIN; +SELECT plan(4); +SELECT pgflow_tests.reset_db(); + +-- Setup: Create flow with 3 chained map steps +SELECT pgflow.create_flow('empty_cascade_flow', timeout => 1); +SELECT pgflow.add_step('empty_cascade_flow', 'map_1', step_type => 'map'); +SELECT pgflow.add_step('empty_cascade_flow', 'map_2', ARRAY['map_1'], step_type => 'map'); +SELECT pgflow.add_step('empty_cascade_flow', 'map_3', ARRAY['map_2'], step_type => 'map'); + +-- Start flow with empty array +SELECT pgflow.start_flow('empty_cascade_flow', '[]'::jsonb); + +-- Test 1: Run should be completed +SELECT results_eq( + $$ SELECT status::text FROM pgflow.runs WHERE flow_slug = 'empty_cascade_flow' $$, + $$ VALUES ('completed'::text) $$, + 'Run should be completed after empty array cascade' +); + +-- Test 2: Run output should be set (not NULL) +SELECT ok( + (SELECT output IS NOT NULL FROM pgflow.runs WHERE flow_slug = 'empty_cascade_flow'), + 'Run output should be set (not NULL)' +); + +-- Test 3: Run output should contain leaf step with empty array +SELECT results_eq( + $$ SELECT output->'map_3' FROM pgflow.runs WHERE flow_slug = 'empty_cascade_flow' $$, + $$ VALUES ('[]'::jsonb) $$, + 'Leaf map step output should be empty array' +); + +-- Test 4: All step states should be completed +SELECT results_eq( + $$ SELECT COUNT(*)::int FROM pgflow.step_states + WHERE run_id = (SELECT run_id FROM pgflow.runs WHERE flow_slug = 'empty_cascade_flow') + AND status = 'completed' $$, + $$ VALUES (3) $$, + 'All 3 step states should be completed' +); + +SELECT * FROM finish(); +ROLLBACK; diff --git a/scripts/snapshot-release.sh b/scripts/snapshot-release.sh index 65647c0fe..98024be4b 100755 --- a/scripts/snapshot-release.sh +++ b/scripts/snapshot-release.sh @@ -97,7 +97,8 @@ if [[ "$NO_CLEANUP" != "true" ]]; then git restore --source=HEAD --worktree --staged \ pnpm-lock.yaml 2>/dev/null || true; \ git restore --source=HEAD --worktree --staged \ - .changeset/*.md .changeset/pre.json 2>/dev/null || true' EXIT + .changeset/pre.json 2>/dev/null || true; \ + git clean -fd .changeset 2>/dev/null || true' EXIT fi # ------------------------------------------------------------------ @@ -145,13 +146,26 @@ echo "" # ------------------------------------------------------------------ echo -e "${BOLD}Checking for changesets...${NC}" -if ! pnpm exec changeset status 2>/dev/null | grep -q -E "(packages will be released|Packages to be bumped)" ; then - echo -e "${RED}✗ No unreleased changesets found${NC}" +# Check for changeset files (either committed or uncommitted) +CHANGESET_FILES=$(find .changeset -name "*.md" -not -name "README.md" 2>/dev/null || true) + +if [[ -z "$CHANGESET_FILES" ]]; then + echo -e "${RED}✗ No changeset files found${NC}" echo "" echo "Create a changeset first:" echo -e " ${BLUE}pnpm exec changeset${NC}" exit 1 fi + +# Verify changesets will create versions +if ! pnpm exec changeset status 2>/dev/null | grep -q -E "(packages will be released|Packages to be bumped)" ; then + echo -e "${RED}✗ No packages to release from changesets${NC}" + echo "" + echo "Changeset files found but no packages will be bumped." + echo "This might mean changesets are already released." + exit 1 +fi + echo -e "${GREEN}✓ Found unreleased changesets${NC}" echo ""