Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 6 additions & 40 deletions PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,9 @@
- ✅ **WORKING**: Empty array maps (taskless) cascade and complete correctly
- ✅ **WORKING**: Task spawning creates N tasks with correct indices
- ✅ **WORKING**: Dependency count propagation for map steps
- **MISSING**: Array element extraction - tasks get full array instead of individual items
- **WORKING**: Array element extraction - tasks get full array instead of individual items
- ❌ **MISSING**: Output aggregation - no way to combine map task outputs for dependents

### What Needs to Be Done

1. **Array Element Extraction in `start_tasks()`**

- Each map task must receive `array[task_index]` not the entire array
- Requires modifying the input assembly logic to use `jsonb_array_element()`

2. **Output Aggregation When Map Completes**
- When all map tasks finish, aggregate outputs: `jsonb_agg(output ORDER BY task_index)`
- Store this somewhere accessible to dependent steps
- Options: Add column to step_states, compute on-demand, or temporary storage

### Example: What Should Happen (But Doesn't)

```sql
-- Given a flow: normalStep -> mapStep -> finalStep

-- 1. normalStep completes with output:
'["apple", "banana", "cherry"]'

-- 2. mapStep should spawn 3 tasks:
-- Task 0 receives: {"normalStep": "apple"} ← NOT WORKING (gets full array)
-- Task 1 receives: {"normalStep": "banana"} ← NOT WORKING (gets full array)
-- Task 2 receives: {"normalStep": "cherry"} ← NOT WORKING (gets full array)

-- 3. Each task processes and outputs:
-- Task 0 outputs: {"processed": "APPLE"}
-- Task 1 outputs: {"processed": "BANANA"}
-- Task 2 outputs: {"processed": "CHERRY"}

-- 4. When mapStep completes, aggregate outputs:
'[{"processed": "APPLE"}, {"processed": "BANANA"}, {"processed": "CHERRY"}]' ← NOT WORKING

-- 5. finalStep receives the aggregated array as input
```

## Implementation Status

### Sequential Child PR Plan
Expand Down Expand Up @@ -83,26 +47,28 @@
- All taskless cascade tests passing (7/7 test files)

- [x] **PR #212: Dependent Map Count Propagation**

- Enhanced complete_task() sets initial_tasks for dependent maps
- Array validation and count propagation working
- Cascade handles taskless dependent maps

- [x] **PR #213: NULL for Unknown initial_tasks** - `09-16-make-initial-tasks-nullable`

- Changed initial_tasks from "1 as placeholder" to NULL for dependent map steps
- Benefits: Semantic correctness (NULL = unknown, not "1 task")
- Implemented: Schema change to allow NULL, updated all SQL functions
- Added validation for non-array and NULL outputs to map steps
- Comprehensive tests for NULL behavior and error cases

#### ❌ Remaining Work

- [ ] **Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE)
- [x] **PR #216: Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE)

- Enhanced start_tasks() to distribute array elements to map tasks
- Each map task receives its specific array element based on task_index
- Handles both root maps (from run input) and dependent maps (from step outputs)
- Tests with actual array data processing

#### ❌ Remaining Work

- [ ] **Output Aggregation** (CRITICAL - BLOCKS MAP OUTPUT CONSUMPTION)

- Aggregate map task outputs when step completes
Expand Down
78 changes: 75 additions & 3 deletions pkgs/core/schemas/0120_function_start_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ as $$
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
Expand All @@ -82,11 +83,82 @@ as $$
st.flow_slug,
st.run_id,
st.step_slug,
jsonb_build_object('run', r.input) ||
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array bounds vulnerability - jsonb_array_element(r.input, st.task_index) will return NULL if task_index is out of bounds, but this NULL is passed directly as task input without validation. If there's a race condition or bug in task creation that results in task_index >= array_length, tasks will receive NULL input and likely fail at runtime. Add bounds checking or handle NULL case explicitly.

Suggested change
jsonb_array_element(r.input, st.task_index)
CASE
WHEN jsonb_array_length(r.input) > st.task_index THEN jsonb_array_element(r.input, st.task_index)
ELSE jsonb_build_object('error', 'Task index out of bounds')
END

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
Comment on lines +133 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical bug: The subquery uses LIMIT 1 without ORDER BY, making the result non-deterministic when a map step has multiple dependencies. While the comment states 'Map steps have exactly 1 dependency (enforced by add_step)', this creates a race condition if that constraint is ever violated or if there are concurrent modifications. The query should either add ORDER BY for deterministic results or add a runtime check to ensure exactly one dependency exists. This could cause map tasks to receive elements from random dependencies in multi-dependency scenarios.

Suggested change
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
WHERE (SELECT COUNT(*) FROM jsonb_each(dep_out.deps_output)) = 1
LIMIT 1)

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

END

-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive ALL inputs as a structured object.
-- This includes the original run input plus all dependency outputs.
ELSE
-- Non-map steps get structured input with named keys
-- Example output: {
-- "run": {"original": "input"},
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
--
-- Build object with 'run' key containing original input
jsonb_build_object('run', r.input) ||
-- Merge with deps_output which already has dependency outputs
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
-- If no dependencies, defaults to empty object
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
dep_task.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)

-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END

-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive ALL inputs as a structured object.
-- This includes the original run input plus all dependency outputs.
ELSE
-- Non-map steps get structured input with named keys
-- Example output: {
-- "run": {"original": "input"},
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
--
-- Build object with 'run' key containing original input
jsonb_build_object('run', r.input) ||
-- Merge with deps_output which already has dependency outputs
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
-- If no dependencies, defaults to empty object
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
h1:5eNDpXz1Ru5E6c9G7Glyo398mstJYLNNhjqcTjOaGxI=
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
Expand All @@ -13,3 +13,4 @@ h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto=
20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc=
20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y=
20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4=
Loading