Skip to content

Commit ce60c8e

Browse files
committed
feat: enhance start_tasks with conditional input logic and add array handling migration
- Updated start_tasks function to build step input conditionally based on step type - Implemented logic for root and dependent map steps to extract array elements - Added a new migration script to handle array elements in start_tasks - Included comprehensive tests for dependent map element extraction, large array processing, mixed JSON types, and nested arrays - Improved handling of array-based tasks and input construction for various step configurations
1 parent 2b71eb5 commit ce60c8e

11 files changed

+1396
-4
lines changed

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ as $$
5656
select
5757
d.run_id,
5858
d.step_slug,
59-
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
59+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
60+
count(*) as dep_count
6061
from deps d
6162
group by d.run_id, d.step_slug
6263
),
@@ -82,11 +83,82 @@ as $$
8283
st.flow_slug,
8384
st.run_id,
8485
st.step_slug,
85-
jsonb_build_object('run', r.input) ||
86-
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
86+
-- ==========================================
87+
-- INPUT CONSTRUCTION LOGIC
88+
-- ==========================================
89+
-- This nested CASE statement determines how to construct the input
90+
-- for each task based on the step type (map vs non-map).
91+
--
92+
-- The fundamental difference:
93+
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
94+
-- - Non-map steps: Receive structured objects with named keys
95+
-- (e.g., {"run": {...}, "dependency1": {...}})
96+
-- ==========================================
97+
CASE
98+
-- -------------------- MAP STEPS --------------------
99+
-- Map steps process arrays element-by-element.
100+
-- Each task receives ONE element from the array at its task_index position.
101+
WHEN step.step_type = 'map' THEN
102+
-- Map steps get raw array elements without any wrapper object
103+
CASE
104+
-- ROOT MAP: Gets array from run input
105+
-- Example: run input = [1, 2, 3]
106+
-- task 0 gets: 1
107+
-- task 1 gets: 2
108+
-- task 2 gets: 3
109+
WHEN step.deps_count = 0 THEN
110+
-- Root map (deps_count = 0): no dependencies, reads from run input.
111+
-- Extract the element at task_index from the run's input array.
112+
-- Note: If run input is not an array, this will return NULL
113+
-- and the flow will fail (validated in start_flow).
114+
jsonb_array_element(r.input, st.task_index)
115+
116+
-- DEPENDENT MAP: Gets array from its single dependency
117+
-- Example: dependency output = ["a", "b", "c"]
118+
-- task 0 gets: "a"
119+
-- task 1 gets: "b"
120+
-- task 2 gets: "c"
121+
ELSE
122+
-- Has dependencies (should be exactly 1 for map steps).
123+
-- Extract the element at task_index from the dependency's output array.
124+
--
125+
-- Why the subquery with jsonb_each?
126+
-- - The dependency outputs a raw array: [1, 2, 3]
127+
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
128+
-- - We need to unwrap and get just the array value
129+
-- - Map steps have exactly 1 dependency (enforced by add_step)
130+
-- - So jsonb_each will return exactly 1 row
131+
-- - We extract the 'value' which is the raw array [1, 2, 3]
132+
-- - Then get the element at task_index from that array
133+
(SELECT jsonb_array_element(value, st.task_index)
134+
FROM jsonb_each(dep_out.deps_output)
135+
LIMIT 1)
136+
END
137+
138+
-- -------------------- NON-MAP STEPS --------------------
139+
-- Regular (non-map) steps receive ALL inputs as a structured object.
140+
-- This includes the original run input plus all dependency outputs.
141+
ELSE
142+
-- Non-map steps get structured input with named keys
143+
-- Example output: {
144+
-- "run": {"original": "input"},
145+
-- "step1": {"output": "from_step1"},
146+
-- "step2": {"output": "from_step2"}
147+
-- }
148+
--
149+
-- Build object with 'run' key containing original input
150+
jsonb_build_object('run', r.input) ||
151+
-- Merge with deps_output which already has dependency outputs
152+
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
153+
-- If no dependencies, defaults to empty object
154+
coalesce(dep_out.deps_output, '{}'::jsonb)
155+
END as input,
87156
st.message_id as msg_id
88157
from tasks st
89158
join runs r on st.run_id = r.run_id
159+
join pgflow.steps step on
160+
step.flow_slug = st.flow_slug and
161+
step.step_slug = st.step_slug
90162
left join deps_outputs dep_out on
91163
dep_out.run_id = st.run_id and
92164
dep_out.step_slug = st.step_slug
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
-- Modify "start_tasks" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
3+
with tasks as (
4+
select
5+
task.flow_slug,
6+
task.run_id,
7+
task.step_slug,
8+
task.task_index,
9+
task.message_id
10+
from pgflow.step_tasks as task
11+
where task.flow_slug = start_tasks.flow_slug
12+
and task.message_id = any(msg_ids)
13+
and task.status = 'queued'
14+
),
15+
start_tasks_update as (
16+
update pgflow.step_tasks
17+
set
18+
attempts_count = attempts_count + 1,
19+
status = 'started',
20+
started_at = now(),
21+
last_worker_id = worker_id
22+
from tasks
23+
where step_tasks.message_id = tasks.message_id
24+
and step_tasks.flow_slug = tasks.flow_slug
25+
and step_tasks.status = 'queued'
26+
),
27+
runs as (
28+
select
29+
r.run_id,
30+
r.input
31+
from pgflow.runs r
32+
where r.run_id in (select run_id from tasks)
33+
),
34+
deps as (
35+
select
36+
st.run_id,
37+
st.step_slug,
38+
dep.dep_slug,
39+
dep_task.output as dep_output
40+
from tasks st
41+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
42+
join pgflow.step_tasks dep_task on
43+
dep_task.run_id = st.run_id and
44+
dep_task.step_slug = dep.dep_slug and
45+
dep_task.status = 'completed'
46+
),
47+
deps_outputs as (
48+
select
49+
d.run_id,
50+
d.step_slug,
51+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
52+
count(*) as dep_count
53+
from deps d
54+
group by d.run_id, d.step_slug
55+
),
56+
timeouts as (
57+
select
58+
task.message_id,
59+
task.flow_slug,
60+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
61+
from tasks task
62+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
63+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
64+
),
65+
-- Batch update visibility timeouts for all messages
66+
set_vt_batch as (
67+
select pgflow.set_vt_batch(
68+
start_tasks.flow_slug,
69+
array_agg(t.message_id order by t.message_id),
70+
array_agg(t.vt_delay order by t.message_id)
71+
)
72+
from timeouts t
73+
)
74+
select
75+
st.flow_slug,
76+
st.run_id,
77+
st.step_slug,
78+
-- ==========================================
79+
-- INPUT CONSTRUCTION LOGIC
80+
-- ==========================================
81+
-- This nested CASE statement determines how to construct the input
82+
-- for each task based on the step type (map vs non-map).
83+
--
84+
-- The fundamental difference:
85+
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
86+
-- - Non-map steps: Receive structured objects with named keys
87+
-- (e.g., {"run": {...}, "dependency1": {...}})
88+
-- ==========================================
89+
CASE
90+
-- -------------------- MAP STEPS --------------------
91+
-- Map steps process arrays element-by-element.
92+
-- Each task receives ONE element from the array at its task_index position.
93+
WHEN step.step_type = 'map' THEN
94+
-- Map steps get raw array elements without any wrapper object
95+
CASE
96+
-- ROOT MAP: Gets array from run input
97+
-- Example: run input = [1, 2, 3]
98+
-- task 0 gets: 1
99+
-- task 1 gets: 2
100+
-- task 2 gets: 3
101+
WHEN step.deps_count = 0 THEN
102+
-- Root map (deps_count = 0): no dependencies, reads from run input.
103+
-- Extract the element at task_index from the run's input array.
104+
-- Note: If run input is not an array, this will return NULL
105+
-- and the flow will fail (validated in start_flow).
106+
jsonb_array_element(r.input, st.task_index)
107+
108+
-- DEPENDENT MAP: Gets array from its single dependency
109+
-- Example: dependency output = ["a", "b", "c"]
110+
-- task 0 gets: "a"
111+
-- task 1 gets: "b"
112+
-- task 2 gets: "c"
113+
ELSE
114+
-- Has dependencies (should be exactly 1 for map steps).
115+
-- Extract the element at task_index from the dependency's output array.
116+
--
117+
-- Why the subquery with jsonb_each?
118+
-- - The dependency outputs a raw array: [1, 2, 3]
119+
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
120+
-- - We need to unwrap and get just the array value
121+
-- - Map steps have exactly 1 dependency (enforced by add_step)
122+
-- - So jsonb_each will return exactly 1 row
123+
-- - We extract the 'value' which is the raw array [1, 2, 3]
124+
-- - Then get the element at task_index from that array
125+
(SELECT jsonb_array_element(value, st.task_index)
126+
FROM jsonb_each(dep_out.deps_output)
127+
LIMIT 1)
128+
END
129+
130+
-- -------------------- NON-MAP STEPS --------------------
131+
-- Regular (non-map) steps receive ALL inputs as a structured object.
132+
-- This includes the original run input plus all dependency outputs.
133+
ELSE
134+
-- Non-map steps get structured input with named keys
135+
-- Example output: {
136+
-- "run": {"original": "input"},
137+
-- "step1": {"output": "from_step1"},
138+
-- "step2": {"output": "from_step2"}
139+
-- }
140+
--
141+
-- Build object with 'run' key containing original input
142+
jsonb_build_object('run', r.input) ||
143+
-- Merge with deps_output which already has dependency outputs
144+
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
145+
-- If no dependencies, defaults to empty object
146+
coalesce(dep_out.deps_output, '{}'::jsonb)
147+
END as input,
148+
st.message_id as msg_id
149+
from tasks st
150+
join runs r on st.run_id = r.run_id
151+
join pgflow.steps step on
152+
step.flow_slug = st.flow_slug and
153+
step.step_slug = st.step_slug
154+
left join deps_outputs dep_out on
155+
dep_out.run_id = st.run_id and
156+
dep_out.step_slug = st.step_slug
157+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
1+
h1:5eNDpXz1Ru5E6c9G7Glyo398mstJYLNNhjqcTjOaGxI=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -13,3 +13,4 @@ h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
1313
20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto=
1414
20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc=
1515
20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y=
16+
20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4=
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
begin;
2+
select plan(7);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Dependent map tasks receive individual array elements from predecessor
6+
select diag('Testing dependent map tasks receive elements from predecessor output');
7+
8+
-- SETUP: Create flow with single step -> map step
9+
select pgflow.create_flow('dep_map_flow');
10+
select pgflow.add_step(
11+
flow_slug => 'dep_map_flow',
12+
step_slug => 'producer_step',
13+
deps_slugs => '{}',
14+
step_type => 'single'
15+
);
16+
select pgflow.add_step(
17+
flow_slug => 'dep_map_flow',
18+
step_slug => 'map_consumer',
19+
deps_slugs => ARRAY['producer_step'],
20+
step_type => 'map'
21+
);
22+
23+
-- Start flow with some input
24+
select run_id from pgflow.start_flow('dep_map_flow', '{"initial": "data"}'::jsonb) \gset
25+
26+
-- Verify producer step was created and has a task
27+
select is(
28+
(select count(*) from pgflow.step_tasks
29+
where run_id = :'run_id' and step_slug = 'producer_step'),
30+
1::bigint,
31+
'Producer step should have 1 task'
32+
);
33+
34+
-- Ensure worker exists
35+
select pgflow_tests.ensure_worker('dep_map_flow');
36+
37+
-- Start and complete the producer task with array output
38+
with producer_task as (
39+
select * from pgflow_tests.read_and_start('dep_map_flow', 1, 1) limit 1
40+
)
41+
select pgflow.complete_task(
42+
(select run_id from producer_task),
43+
'producer_step',
44+
0,
45+
'[10, 20, 30, 40]'::jsonb -- Array output from producer
46+
)
47+
from producer_task;
48+
49+
-- Verify producer step is completed
50+
select is(
51+
(select status from pgflow.step_states
52+
where run_id = :'run_id' and step_slug = 'producer_step'),
53+
'completed',
54+
'Producer step should be completed'
55+
);
56+
57+
-- Verify map_consumer initial_tasks was set to 4
58+
select is(
59+
(select initial_tasks from pgflow.step_states
60+
where run_id = :'run_id' and step_slug = 'map_consumer'),
61+
4,
62+
'Map consumer should have initial_tasks = 4 (array length)'
63+
);
64+
65+
-- Verify 4 tasks were created for map_consumer
66+
select is(
67+
(select count(*) from pgflow.step_tasks
68+
where run_id = :'run_id' and step_slug = 'map_consumer'),
69+
4::bigint,
70+
'Should create 4 tasks for map step'
71+
);
72+
73+
-- Get message IDs for each map task
74+
select message_id as msg_id_0 from pgflow.step_tasks
75+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 0 \gset
76+
77+
select message_id as msg_id_1 from pgflow.step_tasks
78+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 1 \gset
79+
80+
select message_id as msg_id_2 from pgflow.step_tasks
81+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 2 \gset
82+
83+
select message_id as msg_id_3 from pgflow.step_tasks
84+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 3 \gset
85+
86+
-- TEST: Each map task receives its specific element from producer output
87+
select is(
88+
(select input from pgflow.start_tasks(
89+
'dep_map_flow',
90+
ARRAY[:'msg_id_0'::bigint],
91+
'11111111-1111-1111-1111-111111111111'::uuid
92+
)),
93+
'10'::jsonb,
94+
'Task 0 should receive first element (10) from producer_step'
95+
);
96+
97+
select is(
98+
(select input from pgflow.start_tasks(
99+
'dep_map_flow',
100+
ARRAY[:'msg_id_1'::bigint],
101+
'11111111-1111-1111-1111-111111111111'::uuid
102+
)),
103+
'20'::jsonb,
104+
'Task 1 should receive second element (20) from producer_step'
105+
);
106+
107+
select is(
108+
(select input from pgflow.start_tasks(
109+
'dep_map_flow',
110+
ARRAY[:'msg_id_3'::bigint],
111+
'11111111-1111-1111-1111-111111111111'::uuid
112+
)),
113+
'40'::jsonb,
114+
'Task 3 should receive fourth element (40) from producer_step'
115+
);
116+
117+
select finish();
118+
rollback;

0 commit comments

Comments
 (0)