Skip to content

Commit 53ca34b

Browse files
committed
feat: enhance start_tasks with conditional input logic and add array handling migration
- Updated start_tasks function to build step input conditionally based on step type - Implemented logic for root and dependent map steps to extract array elements - Added a new migration script to handle array elements in start_tasks - Included comprehensive tests for dependent map element extraction, large array processing, mixed JSON types, and nested arrays - Improved handling of array-based tasks and input construction for various step configurations
1 parent dd1dde0 commit 53ca34b

13 files changed

+1816
-44
lines changed

PLAN.md

Lines changed: 6 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,45 +7,9 @@
77
-**WORKING**: Empty array maps (taskless) cascade and complete correctly
88
-**WORKING**: Task spawning creates N tasks with correct indices
99
-**WORKING**: Dependency count propagation for map steps
10-
- **MISSING**: Array element extraction - tasks get full array instead of individual items
10+
- **WORKING**: Array element extraction - tasks get full array instead of individual items
1111
-**MISSING**: Output aggregation - no way to combine map task outputs for dependents
1212

13-
### What Needs to Be Done
14-
15-
1. **Array Element Extraction in `start_tasks()`**
16-
17-
- Each map task must receive `array[task_index]` not the entire array
18-
- Requires modifying the input assembly logic to use `jsonb_array_element()`
19-
20-
2. **Output Aggregation When Map Completes**
21-
- When all map tasks finish, aggregate outputs: `jsonb_agg(output ORDER BY task_index)`
22-
- Store this somewhere accessible to dependent steps
23-
- Options: Add column to step_states, compute on-demand, or temporary storage
24-
25-
### Example: What Should Happen (But Doesn't)
26-
27-
```sql
28-
-- Given a flow: normalStep -> mapStep -> finalStep
29-
30-
-- 1. normalStep completes with output:
31-
'["apple", "banana", "cherry"]'
32-
33-
-- 2. mapStep should spawn 3 tasks:
34-
-- Task 0 receives: {"normalStep": "apple"} ← NOT WORKING (gets full array)
35-
-- Task 1 receives: {"normalStep": "banana"} ← NOT WORKING (gets full array)
36-
-- Task 2 receives: {"normalStep": "cherry"} ← NOT WORKING (gets full array)
37-
38-
-- 3. Each task processes and outputs:
39-
-- Task 0 outputs: {"processed": "APPLE"}
40-
-- Task 1 outputs: {"processed": "BANANA"}
41-
-- Task 2 outputs: {"processed": "CHERRY"}
42-
43-
-- 4. When mapStep completes, aggregate outputs:
44-
'[{"processed": "APPLE"}, {"processed": "BANANA"}, {"processed": "CHERRY"}]' ← NOT WORKING
45-
46-
-- 5. finalStep receives the aggregated array as input
47-
```
48-
4913
## Implementation Status
5014

5115
### Sequential Child PR Plan
@@ -83,26 +47,28 @@
8347
- All taskless cascade tests passing (7/7 test files)
8448

8549
- [x] **PR #212: Dependent Map Count Propagation**
50+
8651
- Enhanced complete_task() sets initial_tasks for dependent maps
8752
- Array validation and count propagation working
8853
- Cascade handles taskless dependent maps
8954

9055
- [x] **PR #213: NULL for Unknown initial_tasks** - `09-16-make-initial-tasks-nullable`
56+
9157
- Changed initial_tasks from "1 as placeholder" to NULL for dependent map steps
9258
- Benefits: Semantic correctness (NULL = unknown, not "1 task")
9359
- Implemented: Schema change to allow NULL, updated all SQL functions
9460
- Added validation for non-array and NULL outputs to map steps
9561
- Comprehensive tests for NULL behavior and error cases
9662

97-
#### ❌ Remaining Work
98-
99-
- [ ] **Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE)
63+
- [x] **PR #216: Array Element Distribution** (CRITICAL - BLOCKS REAL MAP USAGE)
10064

10165
- Enhanced start_tasks() to distribute array elements to map tasks
10266
- Each map task receives its specific array element based on task_index
10367
- Handles both root maps (from run input) and dependent maps (from step outputs)
10468
- Tests with actual array data processing
10569

70+
#### ❌ Remaining Work
71+
10672
- [ ] **Output Aggregation** (CRITICAL - BLOCKS MAP OUTPUT CONSUMPTION)
10773

10874
- Aggregate map task outputs when step completes

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ as $$
5656
select
5757
d.run_id,
5858
d.step_slug,
59-
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
59+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
60+
count(*) as dep_count
6061
from deps d
6162
group by d.run_id, d.step_slug
6263
),
@@ -82,11 +83,82 @@ as $$
8283
st.flow_slug,
8384
st.run_id,
8485
st.step_slug,
85-
jsonb_build_object('run', r.input) ||
86-
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
86+
-- ==========================================
87+
-- INPUT CONSTRUCTION LOGIC
88+
-- ==========================================
89+
-- This nested CASE statement determines how to construct the input
90+
-- for each task based on the step type (map vs non-map).
91+
--
92+
-- The fundamental difference:
93+
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
94+
-- - Non-map steps: Receive structured objects with named keys
95+
-- (e.g., {"run": {...}, "dependency1": {...}})
96+
-- ==========================================
97+
CASE
98+
-- -------------------- MAP STEPS --------------------
99+
-- Map steps process arrays element-by-element.
100+
-- Each task receives ONE element from the array at its task_index position.
101+
WHEN step.step_type = 'map' THEN
102+
-- Map steps get raw array elements without any wrapper object
103+
CASE
104+
-- ROOT MAP: Gets array from run input
105+
-- Example: run input = [1, 2, 3]
106+
-- task 0 gets: 1
107+
-- task 1 gets: 2
108+
-- task 2 gets: 3
109+
WHEN step.deps_count = 0 THEN
110+
-- Root map (deps_count = 0): no dependencies, reads from run input.
111+
-- Extract the element at task_index from the run's input array.
112+
-- Note: If run input is not an array, this will return NULL
113+
-- and the flow will fail (validated in start_flow).
114+
jsonb_array_element(r.input, st.task_index)
115+
116+
-- DEPENDENT MAP: Gets array from its single dependency
117+
-- Example: dependency output = ["a", "b", "c"]
118+
-- task 0 gets: "a"
119+
-- task 1 gets: "b"
120+
-- task 2 gets: "c"
121+
ELSE
122+
-- Has dependencies (should be exactly 1 for map steps).
123+
-- Extract the element at task_index from the dependency's output array.
124+
--
125+
-- Why the subquery with jsonb_each?
126+
-- - The dependency outputs a raw array: [1, 2, 3]
127+
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
128+
-- - We need to unwrap and get just the array value
129+
-- - Map steps have exactly 1 dependency (enforced by add_step)
130+
-- - So jsonb_each will return exactly 1 row
131+
-- - We extract the 'value' which is the raw array [1, 2, 3]
132+
-- - Then get the element at task_index from that array
133+
(SELECT jsonb_array_element(value, st.task_index)
134+
FROM jsonb_each(dep_out.deps_output)
135+
LIMIT 1)
136+
END
137+
138+
-- -------------------- NON-MAP STEPS --------------------
139+
-- Regular (non-map) steps receive ALL inputs as a structured object.
140+
-- This includes the original run input plus all dependency outputs.
141+
ELSE
142+
-- Non-map steps get structured input with named keys
143+
-- Example output: {
144+
-- "run": {"original": "input"},
145+
-- "step1": {"output": "from_step1"},
146+
-- "step2": {"output": "from_step2"}
147+
-- }
148+
--
149+
-- Build object with 'run' key containing original input
150+
jsonb_build_object('run', r.input) ||
151+
-- Merge with deps_output which already has dependency outputs
152+
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
153+
-- If no dependencies, defaults to empty object
154+
coalesce(dep_out.deps_output, '{}'::jsonb)
155+
END as input,
87156
st.message_id as msg_id
88157
from tasks st
89158
join runs r on st.run_id = r.run_id
159+
join pgflow.steps step on
160+
step.flow_slug = st.flow_slug and
161+
step.step_slug = st.step_slug
90162
left join deps_outputs dep_out on
91163
dep_out.run_id = st.run_id and
92164
dep_out.step_slug = st.step_slug
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
-- Modify "start_tasks" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
3+
with tasks as (
4+
select
5+
task.flow_slug,
6+
task.run_id,
7+
task.step_slug,
8+
task.task_index,
9+
task.message_id
10+
from pgflow.step_tasks as task
11+
where task.flow_slug = start_tasks.flow_slug
12+
and task.message_id = any(msg_ids)
13+
and task.status = 'queued'
14+
),
15+
start_tasks_update as (
16+
update pgflow.step_tasks
17+
set
18+
attempts_count = attempts_count + 1,
19+
status = 'started',
20+
started_at = now(),
21+
last_worker_id = worker_id
22+
from tasks
23+
where step_tasks.message_id = tasks.message_id
24+
and step_tasks.flow_slug = tasks.flow_slug
25+
and step_tasks.status = 'queued'
26+
),
27+
runs as (
28+
select
29+
r.run_id,
30+
r.input
31+
from pgflow.runs r
32+
where r.run_id in (select run_id from tasks)
33+
),
34+
deps as (
35+
select
36+
st.run_id,
37+
st.step_slug,
38+
dep.dep_slug,
39+
dep_task.output as dep_output
40+
from tasks st
41+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
42+
join pgflow.step_tasks dep_task on
43+
dep_task.run_id = st.run_id and
44+
dep_task.step_slug = dep.dep_slug and
45+
dep_task.status = 'completed'
46+
),
47+
deps_outputs as (
48+
select
49+
d.run_id,
50+
d.step_slug,
51+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
52+
count(*) as dep_count
53+
from deps d
54+
group by d.run_id, d.step_slug
55+
),
56+
timeouts as (
57+
select
58+
task.message_id,
59+
task.flow_slug,
60+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
61+
from tasks task
62+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
63+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
64+
),
65+
-- Batch update visibility timeouts for all messages
66+
set_vt_batch as (
67+
select pgflow.set_vt_batch(
68+
start_tasks.flow_slug,
69+
array_agg(t.message_id order by t.message_id),
70+
array_agg(t.vt_delay order by t.message_id)
71+
)
72+
from timeouts t
73+
)
74+
select
75+
st.flow_slug,
76+
st.run_id,
77+
st.step_slug,
78+
-- ==========================================
79+
-- INPUT CONSTRUCTION LOGIC
80+
-- ==========================================
81+
-- This nested CASE statement determines how to construct the input
82+
-- for each task based on the step type (map vs non-map).
83+
--
84+
-- The fundamental difference:
85+
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
86+
-- - Non-map steps: Receive structured objects with named keys
87+
-- (e.g., {"run": {...}, "dependency1": {...}})
88+
-- ==========================================
89+
CASE
90+
-- -------------------- MAP STEPS --------------------
91+
-- Map steps process arrays element-by-element.
92+
-- Each task receives ONE element from the array at its task_index position.
93+
WHEN step.step_type = 'map' THEN
94+
-- Map steps get raw array elements without any wrapper object
95+
CASE
96+
-- ROOT MAP: Gets array from run input
97+
-- Example: run input = [1, 2, 3]
98+
-- task 0 gets: 1
99+
-- task 1 gets: 2
100+
-- task 2 gets: 3
101+
WHEN step.deps_count = 0 THEN
102+
-- Root map (deps_count = 0): no dependencies, reads from run input.
103+
-- Extract the element at task_index from the run's input array.
104+
-- Note: If run input is not an array, this will return NULL
105+
-- and the flow will fail (validated in start_flow).
106+
jsonb_array_element(r.input, st.task_index)
107+
108+
-- DEPENDENT MAP: Gets array from its single dependency
109+
-- Example: dependency output = ["a", "b", "c"]
110+
-- task 0 gets: "a"
111+
-- task 1 gets: "b"
112+
-- task 2 gets: "c"
113+
ELSE
114+
-- Has dependencies (should be exactly 1 for map steps).
115+
-- Extract the element at task_index from the dependency's output array.
116+
--
117+
-- Why the subquery with jsonb_each?
118+
-- - The dependency outputs a raw array: [1, 2, 3]
119+
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
120+
-- - We need to unwrap and get just the array value
121+
-- - Map steps have exactly 1 dependency (enforced by add_step)
122+
-- - So jsonb_each will return exactly 1 row
123+
-- - We extract the 'value' which is the raw array [1, 2, 3]
124+
-- - Then get the element at task_index from that array
125+
(SELECT jsonb_array_element(value, st.task_index)
126+
FROM jsonb_each(dep_out.deps_output)
127+
LIMIT 1)
128+
END
129+
130+
-- -------------------- NON-MAP STEPS --------------------
131+
-- Regular (non-map) steps receive ALL inputs as a structured object.
132+
-- This includes the original run input plus all dependency outputs.
133+
ELSE
134+
-- Non-map steps get structured input with named keys
135+
-- Example output: {
136+
-- "run": {"original": "input"},
137+
-- "step1": {"output": "from_step1"},
138+
-- "step2": {"output": "from_step2"}
139+
-- }
140+
--
141+
-- Build object with 'run' key containing original input
142+
jsonb_build_object('run', r.input) ||
143+
-- Merge with deps_output which already has dependency outputs
144+
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
145+
-- If no dependencies, defaults to empty object
146+
coalesce(dep_out.deps_output, '{}'::jsonb)
147+
END as input,
148+
st.message_id as msg_id
149+
from tasks st
150+
join runs r on st.run_id = r.run_id
151+
join pgflow.steps step on
152+
step.flow_slug = st.flow_slug and
153+
step.step_slug = st.step_slug
154+
left join deps_outputs dep_out on
155+
dep_out.run_id = st.run_id and
156+
dep_out.step_slug = st.step_slug
157+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
1+
h1:5eNDpXz1Ru5E6c9G7Glyo398mstJYLNNhjqcTjOaGxI=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -13,3 +13,4 @@ h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
1313
20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto=
1414
20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc=
1515
20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y=
16+
20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4=

0 commit comments

Comments
 (0)