From 4665802e5c5c94a7cf4188340c7f6db377f9ee8c Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Tue, 16 Sep 2025 10:14:51 +0200 Subject: [PATCH] chore: make initial_tasks NULL by default to indicate 'unknown yet' --- .claude/schema_development.md | 15 + PLAN.md | 47 +- .../PLAN_use_null_for_map_initial_tasks.md | 195 ------ pkgs/core/schemas/0060_tables_runtime.sql | 5 +- ...nction_cascade_complete_taskless_steps.sql | 26 +- .../schemas/0100_function_complete_task.sql | 78 ++- .../0100_function_maybe_complete_run.sql | 12 +- .../core/schemas/0100_function_start_flow.sql | 42 +- .../0100_function_start_ready_steps.sql | 39 +- pkgs/core/src/database-types.ts | 6 +- ...gflow_temp_make_initial_tasks_nullable.sql | 624 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../dependent_maps_null.test.sql | 157 +++++ .../non_array_to_map_should_fail.test.sql | 76 +++ .../null_output_to_map_should_fail.test.sql | 75 +++ .../dependent_map_initial_tasks_null.test.sql | 142 ++++ .../tests/start_flow/flow_only_maps.test.sql | 8 +- 17 files changed, 1290 insertions(+), 260 deletions(-) delete mode 100644 pkgs/core/PLAN_use_null_for_map_initial_tasks.md create mode 100644 pkgs/core/supabase/migrations/20250916142327_pgflow_temp_make_initial_tasks_nullable.sql create mode 100644 pkgs/core/supabase/tests/initial_tasks_null/dependent_maps_null.test.sql create mode 100644 pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql create mode 100644 pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql create mode 100644 pkgs/core/supabase/tests/start_flow/dependent_map_initial_tasks_null.test.sql diff --git a/.claude/schema_development.md b/.claude/schema_development.md index 6d74c4dcd..2d64b7bbf 100644 --- a/.claude/schema_development.md +++ b/.claude/schema_development.md @@ -110,6 +110,21 @@ git rm -f supabase/migrations/*_pgflow_{TEMP,temp}_*.sql **Temp Migrations**: Use TEMP_ prefix for stacked PRs, remove before final merge, CI enforces this **Avoid**: Manual migration edits, forgetting to remove old migration, skipping hash reset, failing tests, mixing changes, merging temp migrations to main +### Performance-First SQL Design + +**Use Section Comments Instead of Helper Functions**: Keep complex functions monolithic for performance. Use clear section comments: + +```sql +-- ========================================== +-- MAIN SECTION: Description +-- ========================================== +WITH +-- ---------- Subsection ---------- +cte_name AS (...) +``` + +Avoids function call overhead, preserves CTE optimization, simpler atomicity. + ## Troubleshooting ### Migration name exists diff --git a/PLAN.md b/PLAN.md index fd27acef7..dbc926981 100644 --- a/PLAN.md +++ b/PLAN.md @@ -87,6 +87,13 @@ - Array validation and count propagation working - Cascade handles taskless dependent maps +- [x] **PR #213: NULL for Unknown initial_tasks** - `09-16-make-initial-tasks-nullable` + - Changed initial_tasks from "1 as placeholder" to NULL for dependent map steps + - Benefits: Semantic correctness (NULL = unknown, not "1 task") + - Implemented: Schema change to allow NULL, updated all SQL functions + - Added validation for non-array and NULL outputs to map steps + - Comprehensive tests for NULL behavior and error cases + #### ❌ Remaining Work - [ ] **Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE) @@ -102,6 +109,8 @@ - Store aggregated output for dependent steps to consume - Maintain task_index ordering in aggregated arrays - Tests for aggregation with actual map task outputs + - **IMPORTANT**: Must add test for map->map NULL propagation when this is implemented + - **IMPORTANT**: Must handle non-array outputs to map steps (should fail the run) - [ ] **DSL Support for .map() Step Type** @@ -124,16 +133,6 @@ - Basic happy path coverage - This should be minimal and added to the Edge Worker integration test suite for now -- [ ] **Semantic Improvement: NULL for Unknown initial_tasks** (OPTIONAL - Can be deferred) - - - Change initial_tasks from "1 as placeholder" to NULL for dependent map steps - - Benefits: Semantic correctness (NULL = unknown, not "1 task") - - Scope: Schema change to allow NULL, update 5+ SQL functions - - See detailed plan in `pkgs/core/PLAN_use_null_for_map_initial_tasks.md` - - **Note**: This is a semantic improvement only - current approach works functionally - - **Warning**: If deferred, new tests for Array Distribution and Output Aggregation will - assume initial_tasks = 1 for dependent maps, making this change harder later - - [ ] **Migration Consolidation** - Remove all temporary/incremental migrations from feature branches @@ -141,6 +140,34 @@ - Ensure clean migration path from current production schema - If NULL improvement is done, include it in the consolidated migration +- [ ] **Update README's** and **Docs** + + - `pkgs/core/README.md` + + - Add new section describing the step types + - Describe single step briefly, focus on describing map step type and how it differs + - Make sure to mention that maps are constrained to have exactly one dependency + - Show multiple cases of inputs -> task creation + - Explain edge cases (empty array propagation, invalid array input) + - Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input + - Explain cascade completion of taskless steps and its limitations + + - `pkgs/dsl/README.md` + + - Briefly describe the new `.array()` and `.map()` methods + - Mention `.array` is mostly sugar, and `.map` is the new step type + - Link to `pkgs/core/README.md` sections for more explanation about map steps + - Make sure to mention that maps are constrained to have exactly one dependency + + - **Add basic docs page** + + - put it into `pkgs/website/src/content/docs/concepts/array-and-map-steps.mdx` + - describe the DSL and how the map works and why we need it + - show example usage of root map + - show example usage of dependent map + - focus mostly on how to use it, instead of how it works under the hood + - link to the README's for more details + - [ ] **Graphite Stack Merge** - Configure Graphite merge queue for the complete PR stack diff --git a/pkgs/core/PLAN_use_null_for_map_initial_tasks.md b/pkgs/core/PLAN_use_null_for_map_initial_tasks.md deleted file mode 100644 index d4ab5923f..000000000 --- a/pkgs/core/PLAN_use_null_for_map_initial_tasks.md +++ /dev/null @@ -1,195 +0,0 @@ -# Plan: Use NULL for Unknown initial_tasks in Dependent Map Steps - -## Motivation -Currently, dependent map steps have `initial_tasks = 1` as a placeholder until their dependencies complete. This is semantically incorrect and confusing: -- `1` implies "will spawn exactly 1 task" but that's false -- `NULL` correctly means "unknown until dependencies complete" -- Reduces cognitive load - see NULL, know it's unknown - -## Critical Considerations -Before implementing, these issues must be addressed: - -### 1. ~~The "Last Dependency" Problem~~ RESOLVED -**Map steps can have at most 1 dependency** (enforced in add_step.sql:24): -```sql --- This constraint simplifies everything: -IF step_type = 'map' AND array_length(deps_slugs, 1) > 1 THEN - RAISE EXCEPTION 'Map step can have at most one dependency' -``` -This means we always know exactly when to resolve `initial_tasks` - when the single dependency completes! - -### 2. Non-Array Output Handling -What if a dependency doesn't produce an array? -```sql --- Must handle both cases: -CASE - WHEN jsonb_typeof(output) = 'array' THEN jsonb_array_length(output) - ELSE 1 -- Treat non-array as single item to map -END -``` - -### 3. ~~Race Condition Prevention~~ RESOLVED -Since map steps have at most 1 dependency, no race conditions possible! -The single dependency completes once, updates initial_tasks atomically. - -### 4. The Start Ready Steps Problem -**CRITICAL**: We cannot use COALESCE - steps must NOT start with NULL initial_tasks -```sql --- WRONG: COALESCE(initial_tasks, 1) --- RIGHT: Add assertion before starting -IF started_step.initial_tasks IS NULL THEN - RAISE EXCEPTION 'Cannot start step % with unknown initial_tasks', step_slug; -END IF; -``` - -## Implementation Plan - -### 1. Schema Change -```sql --- In 0060_tables_runtime.sql -ALTER TABLE pgflow.step_states -ALTER COLUMN initial_tasks DROP NOT NULL, -ALTER COLUMN initial_tasks DROP DEFAULT; - --- Update constraint -ALTER TABLE pgflow.step_states -DROP CONSTRAINT step_states_initial_tasks_check, -ADD CONSTRAINT step_states_initial_tasks_check - CHECK (initial_tasks IS NULL OR initial_tasks >= 0); -``` - -### 2. Update start_flow Function -```sql --- In 0100_function_start_flow.sql --- Change initial_tasks assignment logic: -CASE - WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN - -- Root map: get array length from input - CASE - WHEN jsonb_typeof(start_flow.input) = 'array' THEN - jsonb_array_length(start_flow.input) - ELSE - 1 - END - WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN - -- Dependent map: unknown until dependencies complete - NULL - ELSE - -- Single steps: always 1 task - 1 -END -``` - -### 3. Update start_ready_steps Function -```sql --- In 0100_function_start_ready_steps.sql - --- Empty map detection remains unchanged: -AND step_state.initial_tasks = 0 -- NULL != 0, so NULL maps won't match - --- Add NULL check BEFORE starting steps: -ready_steps AS ( - SELECT * - FROM pgflow.step_states - WHERE remaining_deps = 0 - AND status = 'created' - AND initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count -) - --- Task generation stays the same (no COALESCE needed): -CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) -``` - -### 4. Update complete_task Function -```sql --- In 0100_function_complete_task.sql - --- Simplified: map steps have exactly 1 dependency -initial_tasks = CASE - WHEN s.step_type = 'map' AND ss.initial_tasks IS NULL THEN - -- Resolve NULL to actual value based on output - CASE - WHEN jsonb_typeof(complete_task.output) = 'array' - THEN jsonb_array_length(complete_task.output) - ELSE 1 -- Non-array treated as single item - END - ELSE ss.initial_tasks -- Keep existing value -END - --- Note: This already works for single->map! --- Just need to extend for map->map when we aggregate outputs -``` - -### 5. Update cascade_complete_taskless_steps Function -```sql --- In 0100_function_cascade_complete_taskless_steps.sql - --- Update the initial_tasks setting for cascade: -initial_tasks = CASE - WHEN s.step_type = 'map' AND dep_count.has_zero_tasks - THEN 0 -- Empty array propagation - ELSE ss.initial_tasks -- Keep NULL as NULL -END - --- The BOOL_OR(c.initial_tasks = 0) already handles NULL correctly --- (NULL = 0 returns false, which is what we want) -``` - -### 6. Add Safety Assertions -```sql --- Add check constraint or trigger to ensure: --- When status changes from 'created' to 'started', --- initial_tasks must NOT be NULL - -ALTER TABLE pgflow.step_states -ADD CONSTRAINT initial_tasks_known_when_started - CHECK ( - status != 'started' - OR initial_tasks IS NOT NULL - ); -``` - -### 7. Update Tests -- Update test expectations to check for NULL instead of 1 -- Add specific tests for NULL -> actual value transitions -- Test that steps can't start with NULL initial_tasks - -### 8. Migration Strategy -```sql --- Create migration to update existing data: -UPDATE pgflow.step_states -SET initial_tasks = NULL -WHERE step_slug IN ( - SELECT s.step_slug - FROM pgflow.steps s - WHERE s.step_type = 'map' - AND EXISTS ( - SELECT 1 FROM pgflow.deps d - WHERE d.flow_slug = s.flow_slug - AND d.step_slug = s.step_slug - ) -) -AND status = 'created'; -``` - -## Benefits -1. **Semantic correctness**: NULL = unknown, not "1 task" placeholder -2. **Clearer mental model**: No translation needed when reading state -3. **Easier debugging**: Can immediately see which values are unresolved -4. **Type safety**: TypeScript `number | null` enforces proper handling -5. **Simpler than expected**: Map steps having max 1 dependency eliminates complexity - -## Simplified Implementation Path -Since map steps can only have 0 or 1 dependency: -1. Root maps (0 deps): Get initial_tasks from flow input immediately -2. Dependent maps (1 dep): Start with NULL, resolve when dependency completes -3. No multi-dependency complexity or race conditions! - -## Testing Checklist -- [ ] Root map steps get correct initial_tasks from input array -- [ ] Dependent map steps start with NULL initial_tasks -- [ ] Single -> Map updates NULL to array length -- [ ] Map -> Map updates NULL to aggregated array length (future) -- [ ] Empty array propagation sets 0, not NULL -- [ ] Steps cannot start with NULL initial_tasks -- [ ] All arithmetic operations handle NULL safely \ No newline at end of file diff --git a/pkgs/core/schemas/0060_tables_runtime.sql b/pkgs/core/schemas/0060_tables_runtime.sql index 4c069f810..0ce5ff620 100644 --- a/pkgs/core/schemas/0060_tables_runtime.sql +++ b/pkgs/core/schemas/0060_tables_runtime.sql @@ -27,7 +27,7 @@ create table pgflow.step_states ( step_slug text not null, status text not null default 'created', remaining_tasks int null, -- NULL = not started, >0 = active countdown - initial_tasks int not null default 1 check (initial_tasks >= 0), -- Planned task count: 1 for singles, N for maps + initial_tasks int null check (initial_tasks is null or initial_tasks >= 0), remaining_deps int not null default 0 check (remaining_deps >= 0), error_message text, created_at timestamptz not null default now(), @@ -43,6 +43,9 @@ create table pgflow.step_states ( constraint remaining_tasks_state_consistency check ( remaining_tasks is null or status != 'created' ), + constraint initial_tasks_known_when_started check ( + status != 'started' or initial_tasks is not null + ), constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)), constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at), constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at), diff --git a/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql index 6f13e6b4b..60321c933 100644 --- a/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql +++ b/pkgs/core/schemas/0100_function_cascade_complete_taskless_steps.sql @@ -8,15 +8,23 @@ DECLARE v_iterations int := 0; v_max_iterations int := 50; BEGIN + -- ========================================== + -- ITERATIVE CASCADE COMPLETION + -- ========================================== + -- Completes taskless steps in waves until none remain LOOP - -- Safety counter to prevent infinite loops + -- ---------- Safety check ---------- v_iterations := v_iterations + 1; IF v_iterations > v_max_iterations THEN RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; END IF; + -- ========================================== + -- COMPLETE READY TASKLESS STEPS + -- ========================================== WITH completed AS ( - -- Complete all ready taskless steps in topological order + -- ---------- Complete taskless steps ---------- + -- Steps with initial_tasks=0 and no remaining deps UPDATE pgflow.step_states ss SET status = 'completed', started_at = now(), @@ -32,19 +40,20 @@ BEGIN -- Process in topological order to ensure proper cascade RETURNING ss.* ), + -- ---------- Update dependent steps ---------- + -- Propagate completion and empty arrays to dependents dep_updates AS ( - -- Update remaining_deps and initial_tasks for dependents of completed steps UPDATE pgflow.step_states ss SET remaining_deps = ss.remaining_deps - dep_count.count, -- If the dependent is a map step and its dependency completed with 0 tasks, -- set its initial_tasks to 0 as well initial_tasks = CASE WHEN s.step_type = 'map' AND dep_count.has_zero_tasks - THEN 0 - ELSE ss.initial_tasks + THEN 0 -- Empty array propagation + ELSE ss.initial_tasks -- Keep existing value (including NULL) END FROM ( - -- Count how many completed steps are dependencies of each dependent + -- Aggregate dependency updates per dependent step SELECT d.flow_slug, d.step_slug as dependent_slug, @@ -62,8 +71,8 @@ BEGIN AND s.flow_slug = ss.flow_slug AND s.step_slug = ss.step_slug ), + -- ---------- Update run counters ---------- run_updates AS ( - -- Update run's remaining_steps count UPDATE pgflow.runs r SET remaining_steps = r.remaining_steps - c.completed_count, status = CASE @@ -80,9 +89,10 @@ BEGIN WHERE r.run_id = cascade_complete_taskless_steps.run_id AND c.completed_count > 0 ) + -- ---------- Check iteration results ---------- SELECT COUNT(*) INTO v_iteration_completed FROM completed; - EXIT WHEN v_iteration_completed = 0; + EXIT WHEN v_iteration_completed = 0; -- No more steps to complete v_total_completed := v_total_completed + v_iteration_completed; END LOOP; diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index da9227c49..25a075897 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -11,9 +11,40 @@ set search_path to '' as $$ declare v_step_state pgflow.step_states%ROWTYPE; + v_dependent_map_slug text; begin -WITH run_lock AS ( +-- ========================================== +-- VALIDATION: Array output for dependent maps +-- ========================================== +-- Must happen BEFORE acquiring locks to fail fast without holding resources +SELECT ds.step_slug INTO v_dependent_map_slug +FROM pgflow.deps d +JOIN pgflow.steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.step_slug +JOIN pgflow.step_states ss ON ss.flow_slug = ds.flow_slug AND ss.step_slug = ds.step_slug +WHERE d.dep_slug = complete_task.step_slug + AND d.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND ds.step_type = 'map' + AND ss.run_id = complete_task.run_id + AND ss.initial_tasks IS NULL + AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') +LIMIT 1; + +IF v_dependent_map_slug IS NOT NULL THEN + RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)', + v_dependent_map_slug, + complete_task.step_slug, + CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END, + complete_task.output; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Update task and propagate changes +-- ========================================== +WITH +-- ---------- Lock acquisition ---------- +-- Acquire locks in consistent order (run -> step) to prevent deadlocks +run_lock AS ( SELECT * FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id FOR UPDATE @@ -24,6 +55,8 @@ step_lock AS ( AND pgflow.step_states.step_slug = complete_task.step_slug FOR UPDATE ), +-- ---------- Task completion ---------- +-- Update the task record with completion status and output task AS ( UPDATE pgflow.step_tasks SET @@ -36,6 +69,8 @@ task AS ( AND pgflow.step_tasks.status = 'started' RETURNING * ), +-- ---------- Step state update ---------- +-- Decrement remaining_tasks and potentially mark step as completed step_state AS ( UPDATE pgflow.step_states SET @@ -53,7 +88,8 @@ step_state AS ( AND pgflow.step_states.step_slug = complete_task.step_slug RETURNING pgflow.step_states.* ), --- Find all dependent steps if the current step was completed +-- ---------- Dependency resolution ---------- +-- Find all steps that depend on the completed step (only if step completed) dependent_steps AS ( SELECT d.step_slug AS dependent_step_slug FROM pgflow.deps d @@ -61,22 +97,27 @@ dependent_steps AS ( WHERE d.dep_slug = complete_task.step_slug ORDER BY d.step_slug -- Ensure consistent ordering ), --- Lock dependent steps before updating +-- ---------- Lock dependent steps ---------- +-- Acquire locks on all dependent steps before updating them dependent_steps_lock AS ( SELECT * FROM pgflow.step_states WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps) FOR UPDATE ), --- Update all dependent steps +-- ---------- Update dependent steps ---------- +-- Decrement remaining_deps and resolve NULL initial_tasks for map steps dependent_steps_update AS ( UPDATE pgflow.step_states ss SET remaining_deps = ss.remaining_deps - 1, - -- For map dependents of single steps producing arrays, set initial_tasks + -- Resolve NULL initial_tasks for dependent map steps + -- This is where dependent maps learn their array size from upstream initial_tasks = CASE - WHEN s.step_type = 'map' AND jsonb_typeof(complete_task.output) = 'array' - THEN jsonb_array_length(complete_task.output) - ELSE ss.initial_tasks + WHEN s.step_type = 'map' AND ss.initial_tasks IS NULL + AND complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE ss.initial_tasks -- Keep existing value (including NULL) END FROM dependent_steps ds, pgflow.steps s WHERE ss.run_id = complete_task.run_id @@ -84,22 +125,28 @@ dependent_steps_update AS ( AND s.flow_slug = ss.flow_slug AND s.step_slug = ss.step_slug ) --- Only decrement remaining_steps, don't update status +-- ---------- Update run remaining_steps ---------- +-- Decrement the run's remaining_steps counter if step completed UPDATE pgflow.runs SET remaining_steps = pgflow.runs.remaining_steps - 1 FROM step_state WHERE pgflow.runs.run_id = complete_task.run_id AND step_state.status = 'completed'; --- Get the updated step state for broadcasting +-- ========================================== +-- POST-COMPLETION ACTIONS +-- ========================================== + +-- ---------- Get updated state for broadcasting ---------- SELECT * INTO v_step_state FROM pgflow.step_states WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; --- Send broadcast event for step completed if the step is completed +-- ---------- Handle step completion ---------- IF v_step_state.status = 'completed' THEN - -- Step just completed, cascade any ready taskless steps + -- Cascade complete any taskless steps that are now ready PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); + -- Broadcast step:completed event PERFORM realtime.send( jsonb_build_object( 'event_type', 'step:completed', @@ -115,7 +162,8 @@ IF v_step_state.status = 'completed' THEN ); END IF; --- For completed tasks: archive the message +-- ---------- Archive completed task message ---------- +-- Move message from active queue to archive table PERFORM ( WITH completed_tasks AS ( SELECT r.flow_slug, st.message_id @@ -131,10 +179,14 @@ PERFORM ( WHERE EXISTS (SELECT 1 FROM completed_tasks) ); +-- ---------- Trigger next steps ---------- +-- Start any steps that are now ready (deps satisfied) PERFORM pgflow.start_ready_steps(complete_task.run_id); +-- Check if the entire run is complete PERFORM pgflow.maybe_complete_run(complete_task.run_id); +-- ---------- Return completed task ---------- RETURN QUERY SELECT * FROM pgflow.step_tasks AS step_task WHERE step_task.run_id = complete_task.run_id diff --git a/pkgs/core/schemas/0100_function_maybe_complete_run.sql b/pkgs/core/schemas/0100_function_maybe_complete_run.sql index 3084d99d5..deb74e3bd 100644 --- a/pkgs/core/schemas/0100_function_maybe_complete_run.sql +++ b/pkgs/core/schemas/0100_function_maybe_complete_run.sql @@ -7,9 +7,12 @@ as $$ declare v_completed_run pgflow.runs%ROWTYPE; begin - -- Update run status to completed and set output when there are no remaining steps + -- ========================================== + -- CHECK AND COMPLETE RUN IF FINISHED + -- ========================================== WITH run_output AS ( - -- Get outputs from final steps (steps that are not dependencies for other steps) + -- ---------- Gather outputs from leaf steps ---------- + -- Leaf steps = steps with no dependents SELECT jsonb_object_agg(st.step_slug, st.output) as final_output FROM pgflow.step_tasks st JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug @@ -23,6 +26,7 @@ begin AND d.dep_slug = ss.step_slug ) ) + -- ---------- Complete run if all steps done ---------- UPDATE pgflow.runs SET status = 'completed', @@ -33,7 +37,9 @@ begin AND pgflow.runs.status != 'completed' RETURNING * INTO v_completed_run; - -- Only send broadcast if run was completed + -- ========================================== + -- BROADCAST COMPLETION EVENT + -- ========================================== IF v_completed_run.run_id IS NOT NULL THEN PERFORM realtime.send( jsonb_build_object( diff --git a/pkgs/core/schemas/0100_function_start_flow.sql b/pkgs/core/schemas/0100_function_start_flow.sql index efcae0428..56a5838fb 100644 --- a/pkgs/core/schemas/0100_function_start_flow.sql +++ b/pkgs/core/schemas/0100_function_start_flow.sql @@ -13,7 +13,9 @@ declare v_root_map_count int; begin --- Check for root map steps and validate input +-- ========================================== +-- VALIDATION: Root map array input +-- ========================================== WITH root_maps AS ( SELECT step_slug FROM pgflow.steps @@ -37,12 +39,17 @@ IF v_root_map_count > 0 THEN END IF; END IF; +-- ========================================== +-- MAIN CTE CHAIN: Create run and step states +-- ========================================== WITH + -- ---------- Gather flow metadata ---------- flow_steps AS ( SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count FROM pgflow.steps WHERE steps.flow_slug = start_flow.flow_slug ), + -- ---------- Create run record ---------- created_run AS ( INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) VALUES ( @@ -53,6 +60,8 @@ WITH ) RETURNING * ), + -- ---------- Create step states ---------- + -- Sets initial_tasks: known for root maps, NULL for dependent maps created_step_states AS ( INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) SELECT @@ -60,24 +69,32 @@ WITH (SELECT created_run.run_id FROM created_run), fs.step_slug, fs.deps_count, - -- For root map steps (map with no deps), set initial_tasks to array length - -- For all other steps, set initial_tasks to 1 - CASE - WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN - CASE - WHEN jsonb_typeof(start_flow.input) = 'array' THEN + -- Updated logic for initial_tasks: + CASE + WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN + -- Root map: get array length from input + CASE + WHEN jsonb_typeof(start_flow.input) = 'array' THEN jsonb_array_length(start_flow.input) - ELSE + ELSE 1 END - ELSE + WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN + -- Dependent map: unknown until dependencies complete + NULL + ELSE + -- Single steps: always 1 task 1 END FROM flow_steps fs ) SELECT * FROM created_run INTO v_created_run; --- Send broadcast event for run started +-- ========================================== +-- POST-CREATION ACTIONS +-- ========================================== + +-- ---------- Broadcast run:started event ---------- PERFORM realtime.send( jsonb_build_object( 'event_type', 'run:started', @@ -93,9 +110,12 @@ PERFORM realtime.send( false ); --- Complete any taskless steps that are ready (e.g., empty array maps) +-- ---------- Complete taskless steps ---------- +-- Handle empty array maps that should auto-complete PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); +-- ---------- Start initial steps ---------- +-- Start root steps (those with no dependencies) PERFORM pgflow.start_ready_steps(v_created_run.run_id); RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; diff --git a/pkgs/core/schemas/0100_function_start_ready_steps.sql b/pkgs/core/schemas/0100_function_start_ready_steps.sql index e8f738915..0952671c2 100644 --- a/pkgs/core/schemas/0100_function_start_ready_steps.sql +++ b/pkgs/core/schemas/0100_function_start_ready_steps.sql @@ -3,8 +3,10 @@ returns void language sql set search_path to '' as $$ - --- First handle empty array map steps (initial_tasks = 0) - direct transition to completed +-- ========================================== +-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) +-- ========================================== +-- These complete immediately without spawning tasks WITH empty_map_steps AS ( SELECT step_state.* FROM pgflow.step_states AS step_state @@ -19,6 +21,7 @@ WITH empty_map_steps AS ( ORDER BY step_state.step_slug FOR UPDATE OF step_state ), +-- ---------- Complete empty map steps ---------- completed_empty_steps AS ( UPDATE pgflow.step_states SET status = 'completed', @@ -30,6 +33,7 @@ completed_empty_steps AS ( AND pgflow.step_states.step_slug = empty_map_steps.step_slug RETURNING pgflow.step_states.* ), +-- ---------- Broadcast completion events ---------- broadcast_empty_completed AS ( SELECT realtime.send( @@ -51,22 +55,29 @@ broadcast_empty_completed AS ( FROM completed_empty_steps AS completed_step ), --- Now handle non-empty steps (both single and map with initial_tasks > 0) +-- ========================================== +-- HANDLE NORMAL STEPS (initial_tasks > 0) +-- ========================================== +-- ---------- Find ready steps ---------- +-- Steps with no remaining deps and known task count ready_steps AS ( SELECT * FROM pgflow.step_states AS step_state WHERE step_state.run_id = start_ready_steps.run_id AND step_state.status = 'created' AND step_state.remaining_deps = 0 + AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count + AND step_state.initial_tasks > 0 -- Don't start taskless steps -- Exclude empty map steps already handled AND NOT EXISTS ( - SELECT 1 FROM empty_map_steps - WHERE empty_map_steps.run_id = step_state.run_id + SELECT 1 FROM empty_map_steps + WHERE empty_map_steps.run_id = step_state.run_id AND empty_map_steps.step_slug = step_state.step_slug ) ORDER BY step_state.step_slug FOR UPDATE ), +-- ---------- Mark steps as started ---------- started_step_states AS ( UPDATE pgflow.step_states SET status = 'started', @@ -78,10 +89,12 @@ started_step_states AS ( RETURNING pgflow.step_states.* ), --- Generate tasks based on initial_tasks count --- For single steps: initial_tasks = 1, so generate_series(0, 0) = single task with index 0 --- For map steps: initial_tasks = N, so generate_series(0, N-1) = N tasks with indices 0..N-1 --- Group messages by step for batch sending +-- ========================================== +-- TASK GENERATION AND QUEUE MESSAGES +-- ========================================== +-- ---------- Generate tasks and batch messages ---------- +-- Single steps: 1 task (index 0) +-- Map steps: N tasks (indices 0..N-1) message_batches AS ( SELECT started_step.flow_slug, @@ -105,7 +118,8 @@ message_batches AS ( CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay ), --- Send messages in batch for better performance with large arrays +-- ---------- Send messages to queue ---------- +-- Uses batch sending for performance with large arrays sent_messages AS ( SELECT mb.flow_slug, @@ -119,6 +133,7 @@ sent_messages AS ( WHERE task_indices.idx_ord = msg_ids.msg_ord ), +-- ---------- Broadcast step:started events ---------- broadcast_events AS ( SELECT realtime.send( @@ -138,7 +153,9 @@ broadcast_events AS ( FROM started_step_states AS started_step ) --- Insert all generated tasks with their respective task_index values +-- ========================================== +-- RECORD TASKS IN DATABASE +-- ========================================== INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) SELECT sent_messages.flow_slug, diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 8841d74a2..27429b7bd 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -127,7 +127,7 @@ export type Database = { error_message: string | null failed_at: string | null flow_slug: string - initial_tasks: number + initial_tasks: number | null remaining_deps: number remaining_tasks: number | null run_id: string @@ -141,7 +141,7 @@ export type Database = { error_message?: string | null failed_at?: string | null flow_slug: string - initial_tasks?: number + initial_tasks?: number | null remaining_deps?: number remaining_tasks?: number | null run_id: string @@ -155,7 +155,7 @@ export type Database = { error_message?: string | null failed_at?: string | null flow_slug?: string - initial_tasks?: number + initial_tasks?: number | null remaining_deps?: number remaining_tasks?: number | null run_id?: string diff --git a/pkgs/core/supabase/migrations/20250916142327_pgflow_temp_make_initial_tasks_nullable.sql b/pkgs/core/supabase/migrations/20250916142327_pgflow_temp_make_initial_tasks_nullable.sql new file mode 100644 index 000000000..feb265556 --- /dev/null +++ b/pkgs/core/supabase/migrations/20250916142327_pgflow_temp_make_initial_tasks_nullable.sql @@ -0,0 +1,624 @@ +-- Modify "step_states" table +ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "step_states_initial_tasks_check", ADD CONSTRAINT "step_states_initial_tasks_check" CHECK ((initial_tasks IS NULL) OR (initial_tasks >= 0)), ADD CONSTRAINT "initial_tasks_known_when_started" CHECK ((status <> 'started'::text) OR (initial_tasks IS NOT NULL)), ALTER COLUMN "initial_tasks" DROP NOT NULL, ALTER COLUMN "initial_tasks" DROP DEFAULT; +-- Modify "cascade_complete_taskless_steps" function +CREATE OR REPLACE FUNCTION "pgflow"."cascade_complete_taskless_steps" ("run_id" uuid) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_total_completed int := 0; + v_iteration_completed int; + v_iterations int := 0; + v_max_iterations int := 50; +BEGIN + -- ========================================== + -- ITERATIVE CASCADE COMPLETION + -- ========================================== + -- Completes taskless steps in waves until none remain + LOOP + -- ---------- Safety check ---------- + v_iterations := v_iterations + 1; + IF v_iterations > v_max_iterations THEN + RAISE EXCEPTION 'Cascade loop exceeded safety limit of % iterations', v_max_iterations; + END IF; + + -- ========================================== + -- COMPLETE READY TASKLESS STEPS + -- ========================================== + WITH completed AS ( + -- ---------- Complete taskless steps ---------- + -- Steps with initial_tasks=0 and no remaining deps + UPDATE pgflow.step_states ss + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = s.flow_slug + AND ss.step_slug = s.step_slug + AND ss.status = 'created' + AND ss.remaining_deps = 0 + AND ss.initial_tasks = 0 + -- Process in topological order to ensure proper cascade + RETURNING ss.* + ), + -- ---------- Update dependent steps ---------- + -- Propagate completion and empty arrays to dependents + dep_updates AS ( + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - dep_count.count, + -- If the dependent is a map step and its dependency completed with 0 tasks, + -- set its initial_tasks to 0 as well + initial_tasks = CASE + WHEN s.step_type = 'map' AND dep_count.has_zero_tasks + THEN 0 -- Empty array propagation + ELSE ss.initial_tasks -- Keep existing value (including NULL) + END + FROM ( + -- Aggregate dependency updates per dependent step + SELECT + d.flow_slug, + d.step_slug as dependent_slug, + COUNT(*) as count, + BOOL_OR(c.initial_tasks = 0) as has_zero_tasks + FROM completed c + JOIN pgflow.deps d ON d.flow_slug = c.flow_slug + AND d.dep_slug = c.step_slug + GROUP BY d.flow_slug, d.step_slug + ) dep_count, + pgflow.steps s + WHERE ss.run_id = cascade_complete_taskless_steps.run_id + AND ss.flow_slug = dep_count.flow_slug + AND ss.step_slug = dep_count.dependent_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug + ), + -- ---------- Update run counters ---------- + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - c.completed_count, + status = CASE + WHEN r.remaining_steps - c.completed_count = 0 + THEN 'completed' + ELSE r.status + END, + completed_at = CASE + WHEN r.remaining_steps - c.completed_count = 0 + THEN now() + ELSE r.completed_at + END + FROM (SELECT COUNT(*) AS completed_count FROM completed) c + WHERE r.run_id = cascade_complete_taskless_steps.run_id + AND c.completed_count > 0 + ) + -- ---------- Check iteration results ---------- + SELECT COUNT(*) INTO v_iteration_completed FROM completed; + + EXIT WHEN v_iteration_completed = 0; -- No more steps to complete + v_total_completed := v_total_completed + v_iteration_completed; + END LOOP; + + RETURN v_total_completed; +END; +$$; +-- Modify "maybe_complete_run" function +CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_completed_run pgflow.runs%ROWTYPE; +begin + -- ========================================== + -- CHECK AND COMPLETE RUN IF FINISHED + -- ========================================== + WITH run_output AS ( + -- ---------- Gather outputs from leaf steps ---------- + -- Leaf steps = steps with no dependents + SELECT jsonb_object_agg(st.step_slug, st.output) as final_output + FROM pgflow.step_tasks st + JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug + JOIN pgflow.runs r ON r.run_id = ss.run_id AND r.flow_slug = ss.flow_slug + WHERE st.run_id = maybe_complete_run.run_id + AND st.status = 'completed' + AND NOT EXISTS ( + SELECT 1 + FROM pgflow.deps d + WHERE d.flow_slug = ss.flow_slug + AND d.dep_slug = ss.step_slug + ) + ) + -- ---------- Complete run if all steps done ---------- + UPDATE pgflow.runs + SET + status = 'completed', + completed_at = now(), + output = (SELECT final_output FROM run_output) + WHERE pgflow.runs.run_id = maybe_complete_run.run_id + AND pgflow.runs.remaining_steps = 0 + AND pgflow.runs.status != 'completed' + RETURNING * INTO v_completed_run; + + -- ========================================== + -- BROADCAST COMPLETION EVENT + -- ========================================== + IF v_completed_run.run_id IS NOT NULL THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:completed', + 'run_id', v_completed_run.run_id, + 'flow_slug', v_completed_run.flow_slug, + 'status', 'completed', + 'output', v_completed_run.output, + 'completed_at', v_completed_run.completed_at + ), + 'run:completed', + concat('pgflow:run:', v_completed_run.run_id), + false + ); + END IF; +end; +$$; +-- Modify "start_ready_steps" function +CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$ +-- ========================================== +-- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) +-- ========================================== +-- These complete immediately without spawning tasks +WITH empty_map_steps AS ( + SELECT step_state.* + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.step_type = 'map' + AND step_state.initial_tasks = 0 + ORDER BY step_state.step_slug + FOR UPDATE OF step_state +), +-- ---------- Complete empty map steps ---------- +completed_empty_steps AS ( + UPDATE pgflow.step_states + SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0 + FROM empty_map_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = empty_map_steps.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Broadcast completion events ---------- +broadcast_empty_completed AS ( + SELECT + realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', completed_step.run_id, + 'step_slug', completed_step.step_slug, + 'status', 'completed', + 'started_at', completed_step.started_at, + 'completed_at', completed_step.completed_at, + 'remaining_tasks', 0, + 'remaining_deps', 0, + 'output', '[]'::jsonb + ), + concat('step:', completed_step.step_slug, ':completed'), + concat('pgflow:run:', completed_step.run_id), + false + ) + FROM completed_empty_steps AS completed_step +), + +-- ========================================== +-- HANDLE NORMAL STEPS (initial_tasks > 0) +-- ========================================== +-- ---------- Find ready steps ---------- +-- Steps with no remaining deps and known task count +ready_steps AS ( + SELECT * + FROM pgflow.step_states AS step_state + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count + AND step_state.initial_tasks > 0 -- Don't start taskless steps + -- Exclude empty map steps already handled + AND NOT EXISTS ( + SELECT 1 FROM empty_map_steps + WHERE empty_map_steps.run_id = step_state.run_id + AND empty_map_steps.step_slug = step_state.step_slug + ) + ORDER BY step_state.step_slug + FOR UPDATE +), +-- ---------- Mark steps as started ---------- +started_step_states AS ( + UPDATE pgflow.step_states + SET status = 'started', + started_at = now(), + remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting + FROM ready_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = ready_steps.step_slug + RETURNING pgflow.step_states.* +), + +-- ========================================== +-- TASK GENERATION AND QUEUE MESSAGES +-- ========================================== +-- ---------- Generate tasks and batch messages ---------- +-- Single steps: 1 task (index 0) +-- Map steps: N tasks (indices 0..N-1) +message_batches AS ( + SELECT + started_step.flow_slug, + started_step.run_id, + started_step.step_slug, + COALESCE(step.opt_start_delay, 0) as delay, + array_agg( + jsonb_build_object( + 'flow_slug', started_step.flow_slug, + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'task_index', task_idx.task_index + ) ORDER BY task_idx.task_index + ) AS messages, + array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices + FROM started_step_states AS started_step + JOIN pgflow.steps AS step + ON step.flow_slug = started_step.flow_slug + AND step.step_slug = started_step.step_slug + -- Generate task indices from 0 to initial_tasks-1 + CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) + GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay +), +-- ---------- Send messages to queue ---------- +-- Uses batch sending for performance with large arrays +sent_messages AS ( + SELECT + mb.flow_slug, + mb.run_id, + mb.step_slug, + task_indices.task_index, + msg_ids.msg_id + FROM message_batches mb + CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) + CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) + WHERE task_indices.idx_ord = msg_ids.msg_ord +), + +-- ---------- Broadcast step:started events ---------- +broadcast_events AS ( + SELECT + realtime.send( + jsonb_build_object( + 'event_type', 'step:started', + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'status', 'started', + 'started_at', started_step.started_at, + 'remaining_tasks', started_step.remaining_tasks, + 'remaining_deps', started_step.remaining_deps + ), + concat('step:', started_step.step_slug, ':started'), + concat('pgflow:run:', started_step.run_id), + false + ) + FROM started_step_states AS started_step +) + +-- ========================================== +-- RECORD TASKS IN DATABASE +-- ========================================== +INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) +SELECT + sent_messages.flow_slug, + sent_messages.run_id, + sent_messages.step_slug, + sent_messages.task_index, + sent_messages.msg_id +FROM sent_messages; +$$; +-- Modify "complete_task" function +CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_step_state pgflow.step_states%ROWTYPE; + v_dependent_map_slug text; +begin + +-- ========================================== +-- VALIDATION: Array output for dependent maps +-- ========================================== +-- Must happen BEFORE acquiring locks to fail fast without holding resources +SELECT ds.step_slug INTO v_dependent_map_slug +FROM pgflow.deps d +JOIN pgflow.steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.step_slug +JOIN pgflow.step_states ss ON ss.flow_slug = ds.flow_slug AND ss.step_slug = ds.step_slug +WHERE d.dep_slug = complete_task.step_slug + AND d.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND ds.step_type = 'map' + AND ss.run_id = complete_task.run_id + AND ss.initial_tasks IS NULL + AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') +LIMIT 1; + +IF v_dependent_map_slug IS NOT NULL THEN + RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)', + v_dependent_map_slug, + complete_task.step_slug, + CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END, + complete_task.output; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Update task and propagate changes +-- ========================================== +WITH +-- ---------- Lock acquisition ---------- +-- Acquire locks in consistent order (run -> step) to prevent deadlocks +run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = complete_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + FOR UPDATE +), +-- ---------- Task completion ---------- +-- Update the task record with completion status and output +task AS ( + UPDATE pgflow.step_tasks + SET + status = 'completed', + completed_at = now(), + output = complete_task.output + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index + AND pgflow.step_tasks.status = 'started' + RETURNING * +), +-- ---------- Step state update ---------- +-- Decrement remaining_tasks and potentially mark step as completed +step_state AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement + ELSE 'started' + END, + completed_at = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement + ELSE NULL + END, + remaining_tasks = pgflow.step_states.remaining_tasks - 1 + FROM task + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Dependency resolution ---------- +-- Find all steps that depend on the completed step (only if step completed) +dependent_steps AS ( + SELECT d.step_slug AS dependent_step_slug + FROM pgflow.deps d + JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug + WHERE d.dep_slug = complete_task.step_slug + ORDER BY d.step_slug -- Ensure consistent ordering +), +-- ---------- Lock dependent steps ---------- +-- Acquire locks on all dependent steps before updating them +dependent_steps_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps) + FOR UPDATE +), +-- ---------- Update dependent steps ---------- +-- Decrement remaining_deps and resolve NULL initial_tasks for map steps +dependent_steps_update AS ( + UPDATE pgflow.step_states ss + SET remaining_deps = ss.remaining_deps - 1, + -- Resolve NULL initial_tasks for dependent map steps + -- This is where dependent maps learn their array size from upstream + initial_tasks = CASE + WHEN s.step_type = 'map' AND ss.initial_tasks IS NULL + AND complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE ss.initial_tasks -- Keep existing value (including NULL) + END + FROM dependent_steps ds, pgflow.steps s + WHERE ss.run_id = complete_task.run_id + AND ss.step_slug = ds.dependent_step_slug + AND s.flow_slug = ss.flow_slug + AND s.step_slug = ss.step_slug +) +-- ---------- Update run remaining_steps ---------- +-- Decrement the run's remaining_steps counter if step completed +UPDATE pgflow.runs +SET remaining_steps = pgflow.runs.remaining_steps - 1 +FROM step_state +WHERE pgflow.runs.run_id = complete_task.run_id + AND step_state.status = 'completed'; + +-- ========================================== +-- POST-COMPLETION ACTIONS +-- ========================================== + +-- ---------- Get updated state for broadcasting ---------- +SELECT * INTO v_step_state FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + +-- ---------- Handle step completion ---------- +IF v_step_state.status = 'completed' THEN + -- Cascade complete any taskless steps that are now ready + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); + + -- Broadcast step:completed event + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'completed', + 'output', complete_task.output, + 'completed_at', v_step_state.completed_at + ), + concat('step:', complete_task.step_slug, ':completed'), + concat('pgflow:run:', complete_task.run_id), + false + ); +END IF; + +-- ---------- Archive completed task message ---------- +-- Move message from active queue to archive table +PERFORM ( + WITH completed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.status = 'completed' + ) + SELECT pgmq.archive(ct.flow_slug, ct.message_id) + FROM completed_tasks ct + WHERE EXISTS (SELECT 1 FROM completed_tasks) +); + +-- ---------- Trigger next steps ---------- +-- Start any steps that are now ready (deps satisfied) +PERFORM pgflow.start_ready_steps(complete_task.run_id); + +-- Check if the entire run is complete +PERFORM pgflow.maybe_complete_run(complete_task.run_id); + +-- ---------- Return completed task ---------- +RETURN QUERY SELECT * +FROM pgflow.step_tasks AS step_task +WHERE step_task.run_id = complete_task.run_id + AND step_task.step_slug = complete_task.step_slug + AND step_task.task_index = complete_task.task_index; + +end; +$$; +-- Modify "start_flow" function +CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_created_run pgflow.runs%ROWTYPE; + v_root_map_count int; +begin + +-- ========================================== +-- VALIDATION: Root map array input +-- ========================================== +WITH root_maps AS ( + SELECT step_slug + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + AND steps.step_type = 'map' + AND steps.deps_count = 0 +) +SELECT COUNT(*) INTO v_root_map_count FROM root_maps; + +-- If we have root map steps, validate that input is an array +IF v_root_map_count > 0 THEN + -- First check for NULL (should be caught by NOT NULL constraint, but be defensive) + IF start_flow.input IS NULL THEN + RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug; + END IF; + + -- Then check if it's not an array + IF jsonb_typeof(start_flow.input) != 'array' THEN + RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)', + start_flow.flow_slug, jsonb_typeof(start_flow.input); + END IF; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Create run and step states +-- ========================================== +WITH + -- ---------- Gather flow metadata ---------- + flow_steps AS ( + SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + ), + -- ---------- Create run record ---------- + created_run AS ( + INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) + VALUES ( + COALESCE(start_flow.run_id, gen_random_uuid()), + start_flow.flow_slug, + start_flow.input, + (SELECT count(*) FROM flow_steps) + ) + RETURNING * + ), + -- ---------- Create step states ---------- + -- Sets initial_tasks: known for root maps, NULL for dependent maps + created_step_states AS ( + INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) + SELECT + fs.flow_slug, + (SELECT created_run.run_id FROM created_run), + fs.step_slug, + fs.deps_count, + -- Updated logic for initial_tasks: + CASE + WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN + -- Root map: get array length from input + CASE + WHEN jsonb_typeof(start_flow.input) = 'array' THEN + jsonb_array_length(start_flow.input) + ELSE + 1 + END + WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN + -- Dependent map: unknown until dependencies complete + NULL + ELSE + -- Single steps: always 1 task + 1 + END + FROM flow_steps fs + ) +SELECT * FROM created_run INTO v_created_run; + +-- ========================================== +-- POST-CREATION ACTIONS +-- ========================================== + +-- ---------- Broadcast run:started event ---------- +PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:started', + 'run_id', v_created_run.run_id, + 'flow_slug', v_created_run.flow_slug, + 'input', v_created_run.input, + 'status', 'started', + 'remaining_steps', v_created_run.remaining_steps, + 'started_at', v_created_run.started_at + ), + 'run:started', + concat('pgflow:run:', v_created_run.run_id), + false +); + +-- ---------- Complete taskless steps ---------- +-- Handle empty array maps that should auto-complete +PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); + +-- ---------- Start initial steps ---------- +-- Start root steps (those with no dependencies) +PERFORM pgflow.start_ready_steps(v_created_run.run_id); + +RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; + +end; +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 492cdcf53..1d88243c6 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:ftS8hMfs4DMg/E9bV4r14WzDpKvPKLCLtpqg1bdybXg= +h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= @@ -12,3 +12,4 @@ h1:ftS8hMfs4DMg/E9bV4r14WzDpKvPKLCLtpqg1bdybXg= 20250912080800_pgflow_temp_pr2_root_maps.sql h1:v2KdChKBPBOIq3nCVVtKWy1OVcIROV+tPtaTUPQujSo= 20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto= 20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc= +20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y= diff --git a/pkgs/core/supabase/tests/initial_tasks_null/dependent_maps_null.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/dependent_maps_null.test.sql new file mode 100644 index 000000000..c067e8cc3 --- /dev/null +++ b/pkgs/core/supabase/tests/initial_tasks_null/dependent_maps_null.test.sql @@ -0,0 +1,157 @@ +begin; +select plan(9); + +-- Create test flow with single -> map dependency +select pgflow.create_flow('test_dependent_map_null'); + +select pgflow.add_step( + flow_slug => 'test_dependent_map_null', + step_slug => 'producer', + step_type => 'single' +); + +select pgflow.add_step( + flow_slug => 'test_dependent_map_null', + step_slug => 'map_consumer', + deps_slugs => '{"producer"}', + step_type => 'map' +); + +-- Start flow +select pgflow.start_flow( + 'test_dependent_map_null', + '{"some": "input"}'::jsonb +); + +-- Test: producer step should have initial_tasks = 1 +select is( + initial_tasks, + 1, + 'Single step producer should have initial_tasks = 1' +) +from pgflow.step_states +where step_slug = 'producer'; + +-- Test: dependent map should have NULL initial_tasks initially +select is( + initial_tasks, + NULL, + 'Dependent map should have NULL initial_tasks before dependency completes' +) +from pgflow.step_states +where step_slug = 'map_consumer'; + +-- Test: dependent map should not be ready to start +select is( + status, + 'created', + 'Dependent map should remain in created status with NULL initial_tasks' +) +from pgflow.step_states +where step_slug = 'map_consumer'; + +-- Complete the producer task with array output +-- First start the task (simulating worker polling) +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('test_dependent_map_null') LIMIT 1 +) +-- Then complete it with array output +SELECT pgflow.complete_task( + task.run_id, + task.step_slug, + 0, -- task_index is always 0 for single steps + '["item1", "item2", "item3"]'::jsonb +) +FROM task; + +-- Test: dependent map should now have initial_tasks = 3 +select is( + initial_tasks, + 3, + 'Dependent map should have initial_tasks = 3 after producer completes with 3-element array' +) +from pgflow.step_states +where step_slug = 'map_consumer'; + +-- Test: dependent map should be ready to start +select is( + remaining_deps, + 0, + 'Dependent map should have remaining_deps = 0 after producer completes' +) +from pgflow.step_states +where step_slug = 'map_consumer'; + +-- Test case 2: Empty array propagation +select pgflow.create_flow('test_empty_array_null'); + +select pgflow.add_step( + flow_slug => 'test_empty_array_null', + step_slug => 'producer2', + step_type => 'single' +); + +select pgflow.add_step( + flow_slug => 'test_empty_array_null', + step_slug => 'map_consumer2', + deps_slugs => '{"producer2"}', + step_type => 'map' +); + +select pgflow.start_flow( + 'test_empty_array_null', + '{"test": "data"}'::jsonb +); + +-- Test: map starts with NULL +select is( + initial_tasks, + NULL, + 'Second dependent map should have NULL initial_tasks initially' +) +from pgflow.step_states +where step_slug = 'map_consumer2'; + +-- Complete the producer2 task with empty array output +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('test_empty_array_null') LIMIT 1 +) +SELECT pgflow.complete_task( + task.run_id, + task.step_slug, + 0, -- task_index + '[]'::jsonb -- Empty array +) +FROM task; + +-- Test: dependent map should have initial_tasks = 0 (not NULL) +select is( + initial_tasks, + 0, + 'Dependent map should have initial_tasks = 0 (not NULL) after producer completes with empty array' +) +from pgflow.step_states +where step_slug = 'map_consumer2'; + +-- Test: dependent map with 0 tasks should auto-complete +select is( + status, + 'completed', + 'Dependent map with initial_tasks = 0 should auto-complete via cascade' +) +from pgflow.step_states +where step_slug = 'map_consumer2'; + +-- Test: Verify constraint exists using information_schema +select ok( + EXISTS ( + SELECT 1 + FROM information_schema.check_constraints + WHERE constraint_schema = 'pgflow' + AND constraint_name = 'initial_tasks_known_when_started' + ), + 'Table should have constraint preventing starting with NULL initial_tasks' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql new file mode 100644 index 000000000..954a94d74 --- /dev/null +++ b/pkgs/core/supabase/tests/initial_tasks_null/non_array_to_map_should_fail.test.sql @@ -0,0 +1,76 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Test: Non-array output to dependent map should fail the run + +-- Create flow with single -> map dependency +select pgflow.create_flow('non_array_test'); + +select pgflow.add_step( + flow_slug => 'non_array_test', + step_slug => 'producer', + step_type => 'single' +); + +select pgflow.add_step( + flow_slug => 'non_array_test', + step_slug => 'map_consumer', + deps_slugs => ARRAY['producer'], + step_type => 'map' +); + +select pgflow.start_flow( + 'non_array_test', + '{}'::jsonb +); + +-- Test: Map starts with NULL initial_tasks +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'map_consumer' limit 1), + NULL::integer, + 'Dependent map should start with NULL initial_tasks' +); + +-- Ensure worker exists for polling +select pgflow_tests.ensure_worker('non_array_test'); + +-- Start the producer task (simulating Edge Worker behavior) +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('non_array_test') LIMIT 1 +) +SELECT step_slug FROM task; + +-- Test: complete_task should RAISE EXCEPTION for non-array output to map +select throws_ok( + $$ + WITH task_info AS ( + SELECT run_id, step_slug, task_index + FROM pgflow.step_tasks + WHERE flow_slug = 'non_array_test' + AND step_slug = 'producer' + LIMIT 1 + ) + SELECT pgflow.complete_task( + task_info.run_id, + task_info.step_slug, + task_info.task_index, + '{"not": "an array"}'::jsonb + ) FROM task_info + $$, + 'P0001', -- RAISE EXCEPTION error code + 'Map step map_consumer expects array input but dependency producer produced object (output: {"not": "an array"})', + 'complete_task should fail when non-array is passed to dependent map' +); + +-- Test: Map initial_tasks should remain NULL after failed transaction +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'map_consumer' limit 1), + NULL, + 'Map initial_tasks should remain NULL after failed non-array update' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql b/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql new file mode 100644 index 000000000..d1c045265 --- /dev/null +++ b/pkgs/core/supabase/tests/initial_tasks_null/null_output_to_map_should_fail.test.sql @@ -0,0 +1,75 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Test: NULL output to dependent map should fail + +-- Create flow with single -> map dependency +select pgflow.create_flow('null_output_test'); + +select pgflow.add_step( + flow_slug => 'null_output_test', + step_slug => 'producer', + step_type => 'single' +); + +select pgflow.add_step( + flow_slug => 'null_output_test', + step_slug => 'map_consumer', + deps_slugs => ARRAY['producer'], + step_type => 'map' +); + +select pgflow.start_flow( + 'null_output_test', + '{}'::jsonb +); + +-- Test: Map starts with NULL initial_tasks +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'map_consumer' limit 1), + NULL::integer, + 'Dependent map should start with NULL initial_tasks' +); + +-- Ensure worker exists for polling +select pgflow_tests.ensure_worker('null_output_test'); + +-- Start the producer task (simulating Edge Worker behavior) +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('null_output_test') LIMIT 1 +) +SELECT step_slug FROM task; + +-- Test: complete_task should RAISE EXCEPTION for NULL output to map +select throws_ilike( + $$ + WITH task_info AS ( + SELECT run_id, step_slug, task_index + FROM pgflow.step_tasks + WHERE flow_slug = 'null_output_test' + AND step_slug = 'producer' + LIMIT 1 + ) + SELECT pgflow.complete_task( + task_info.run_id, + task_info.step_slug, + task_info.task_index, + NULL::jsonb -- Passing literal NULL! + ) FROM task_info + $$, + '%Map step map_consumer expects array input but dependency producer produced null%', + 'complete_task should fail when NULL is passed to dependent map' +); + +-- Test: Map initial_tasks should remain NULL after failed transaction +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'map_consumer' limit 1), + NULL, + 'Map initial_tasks should remain NULL after failed NULL output' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/start_flow/dependent_map_initial_tasks_null.test.sql b/pkgs/core/supabase/tests/start_flow/dependent_map_initial_tasks_null.test.sql new file mode 100644 index 000000000..6bc03099a --- /dev/null +++ b/pkgs/core/supabase/tests/start_flow/dependent_map_initial_tasks_null.test.sql @@ -0,0 +1,142 @@ +begin; +select plan(7); +select pgflow_tests.reset_db(); + +-- Test: Dependent map steps should start with NULL initial_tasks + +-- Create a flow with single -> map dependency +select pgflow.create_flow('single_to_map_flow'); + +-- Add single root step +select pgflow.add_step( + flow_slug => 'single_to_map_flow', + step_slug => 'single_root', + step_type => 'single' +); + +-- Add dependent map step +select pgflow.add_step( + flow_slug => 'single_to_map_flow', + step_slug => 'dependent_map', + deps_slugs => ARRAY['single_root'], + step_type => 'map' +); + +-- Start flow with some input +select pgflow.start_flow('single_to_map_flow', '{"data": "test"}'::jsonb); + +-- TEST: Single root step should have initial_tasks = 1 +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'single_root' limit 1), + 1, + 'Single root step should have initial_tasks = 1' +); + +-- TEST: Dependent map step should have initial_tasks = NULL (unknown until dependency completes) +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'dependent_map' limit 1), + NULL::integer, + 'Dependent map step should have initial_tasks = NULL until dependency completes' +); + +-- Now test with a root map -> dependent map +select pgflow.create_flow('map_to_map_flow'); + +-- Add root map step +select pgflow.add_step( + flow_slug => 'map_to_map_flow', + step_slug => 'root_map', + deps_slugs => '{}', + step_type => 'map' +); + +-- Add dependent map step +select pgflow.add_step( + flow_slug => 'map_to_map_flow', + step_slug => 'dependent_map2', + deps_slugs => ARRAY['root_map'], + step_type => 'map' +); + +-- Start flow with array input for root map +select pgflow.start_flow('map_to_map_flow', '["item1", "item2", "item3"]'::jsonb); + +-- TEST: Root map should have initial_tasks = array length +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'root_map' limit 1), + 3, + 'Root map step should have initial_tasks = 3 for array with 3 elements' +); + +-- TEST: Dependent map step should have initial_tasks = NULL +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'dependent_map2' limit 1), + NULL::integer, + 'Dependent map step should have initial_tasks = NULL even when depending on another map' +); + +-- Test that dependent map steps cannot start with NULL initial_tasks +select pgflow.create_flow('test_no_start_with_null'); + +select pgflow.add_step( + flow_slug => 'test_no_start_with_null', + step_slug => 'single_step', + step_type => 'single' +); + +select pgflow.add_step( + flow_slug => 'test_no_start_with_null', + step_slug => 'map_step', + deps_slugs => ARRAY['single_step'], + step_type => 'map' +); + +-- Start flow and store run_id +select pgflow.start_flow('test_no_start_with_null', '{}'::jsonb); + +-- TEST: Verify map step is not started (should remain created with NULL initial_tasks) +select is( + (select status from pgflow.step_states + where step_slug = 'map_step' + AND flow_slug = 'test_no_start_with_null'), + 'created', + 'Map step with NULL initial_tasks should remain in created status' +); + +-- Complete the single step with an array output +-- This will update the dependent map's initial_tasks +WITH task AS ( + SELECT * FROM pgflow_tests.read_and_start('test_no_start_with_null') LIMIT 1 +) +SELECT pgflow.complete_task( + task.run_id, + task.step_slug, + 0, + '["a", "b"]'::jsonb -- Array with 2 elements +) +FROM task; + +-- TEST: Now the map step should be started +select is( + (select status from pgflow.step_states + where step_slug = 'map_step' + AND flow_slug = 'test_no_start_with_null'), + 'started', + 'Map step should be started after initial_tasks is resolved' +); + +-- TEST: Verify correct number of tasks were created +select is( + (select count(*) from pgflow.step_tasks + where step_slug = 'map_step' + AND flow_slug = 'test_no_start_with_null'), + 2::bigint, + 'Map step should have 2 tasks created' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/start_flow/flow_only_maps.test.sql b/pkgs/core/supabase/tests/start_flow/flow_only_maps.test.sql index 5a50278b8..d9a6c2ccf 100644 --- a/pkgs/core/supabase/tests/start_flow/flow_only_maps.test.sql +++ b/pkgs/core/supabase/tests/start_flow/flow_only_maps.test.sql @@ -56,12 +56,12 @@ select is( 'Second root map should have initial_tasks = 4' ); --- TEST: Dependent map should have initial_tasks = 1 (will be updated when map_one completes) +-- TEST: Dependent map should have initial_tasks = NULL (will be updated when map_one completes) select is( - (select initial_tasks from pgflow.step_states + (select initial_tasks from pgflow.step_states where step_slug = 'map_three' limit 1), - 1, - 'Dependent map should have initial_tasks = 1 initially (will be updated later)' + NULL::integer, + 'Dependent map should have initial_tasks = NULL initially (will be updated when dependency completes)' ); -- TEST: All steps should have correct status