Skip to content

Commit 0a5205c

Browse files
committed
feat: enhance start_tasks with conditional input logic and add array handling migration
- Updated start_tasks function to build step input conditionally based on step type - Implemented logic for root and dependent map steps to extract array elements - Added a new migration script to handle array elements in start_tasks - Included comprehensive tests for dependent map element extraction, large array processing, mixed JSON types, and nested arrays - Improved handling of array-based tasks and input construction for various step configurations
1 parent 2b71eb5 commit 0a5205c

10 files changed

+915
-3
lines changed

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,34 @@ as $$
8282
st.flow_slug,
8383
st.run_id,
8484
st.step_slug,
85-
jsonb_build_object('run', r.input) ||
86-
coalesce(dep_out.deps_output, '{}'::jsonb) as input,
85+
-- Conditional input building based on step type
86+
CASE
87+
WHEN step.step_type = 'map' THEN
88+
CASE
89+
WHEN dep_out.deps_output IS NULL OR dep_out.deps_output = '{}'::jsonb THEN
90+
-- Root map: extract element from run input using task_index
91+
jsonb_build_object('run', jsonb_array_element(r.input, st.task_index))
92+
ELSE
93+
-- Dependent map: extract element from dependency output
94+
-- Map has exactly 1 dependency (validated elsewhere)
95+
(SELECT jsonb_build_object(
96+
key,
97+
jsonb_array_element(value, st.task_index)
98+
)
99+
FROM jsonb_each(dep_out.deps_output)
100+
LIMIT 1)
101+
END
102+
ELSE
103+
-- Non-map steps: current behavior
104+
jsonb_build_object('run', r.input) ||
105+
coalesce(dep_out.deps_output, '{}'::jsonb)
106+
END as input,
87107
st.message_id as msg_id
88108
from tasks st
89109
join runs r on st.run_id = r.run_id
110+
join pgflow.steps step on
111+
step.flow_slug = st.flow_slug and
112+
step.step_slug = st.step_slug
90113
left join deps_outputs dep_out on
91114
dep_out.run_id = st.run_id and
92115
dep_out.step_slug = st.step_slug
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
-- Modify "start_tasks" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
3+
with tasks as (
4+
select
5+
task.flow_slug,
6+
task.run_id,
7+
task.step_slug,
8+
task.task_index,
9+
task.message_id
10+
from pgflow.step_tasks as task
11+
where task.flow_slug = start_tasks.flow_slug
12+
and task.message_id = any(msg_ids)
13+
and task.status = 'queued'
14+
),
15+
start_tasks_update as (
16+
update pgflow.step_tasks
17+
set
18+
attempts_count = attempts_count + 1,
19+
status = 'started',
20+
started_at = now(),
21+
last_worker_id = worker_id
22+
from tasks
23+
where step_tasks.message_id = tasks.message_id
24+
and step_tasks.flow_slug = tasks.flow_slug
25+
and step_tasks.status = 'queued'
26+
),
27+
runs as (
28+
select
29+
r.run_id,
30+
r.input
31+
from pgflow.runs r
32+
where r.run_id in (select run_id from tasks)
33+
),
34+
deps as (
35+
select
36+
st.run_id,
37+
st.step_slug,
38+
dep.dep_slug,
39+
dep_task.output as dep_output
40+
from tasks st
41+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
42+
join pgflow.step_tasks dep_task on
43+
dep_task.run_id = st.run_id and
44+
dep_task.step_slug = dep.dep_slug and
45+
dep_task.status = 'completed'
46+
),
47+
deps_outputs as (
48+
select
49+
d.run_id,
50+
d.step_slug,
51+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output
52+
from deps d
53+
group by d.run_id, d.step_slug
54+
),
55+
timeouts as (
56+
select
57+
task.message_id,
58+
task.flow_slug,
59+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
60+
from tasks task
61+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
62+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
63+
),
64+
-- Batch update visibility timeouts for all messages
65+
set_vt_batch as (
66+
select pgflow.set_vt_batch(
67+
start_tasks.flow_slug,
68+
array_agg(t.message_id order by t.message_id),
69+
array_agg(t.vt_delay order by t.message_id)
70+
)
71+
from timeouts t
72+
)
73+
select
74+
st.flow_slug,
75+
st.run_id,
76+
st.step_slug,
77+
-- Conditional input building based on step type
78+
CASE
79+
WHEN step.step_type = 'map' THEN
80+
CASE
81+
WHEN dep_out.deps_output IS NULL OR dep_out.deps_output = '{}'::jsonb THEN
82+
-- Root map: extract element from run input using task_index
83+
jsonb_build_object('run', jsonb_array_element(r.input, st.task_index))
84+
ELSE
85+
-- Dependent map: extract element from dependency output
86+
-- Map has exactly 1 dependency (validated elsewhere)
87+
(SELECT jsonb_build_object(
88+
key,
89+
jsonb_array_element(value, st.task_index)
90+
)
91+
FROM jsonb_each(dep_out.deps_output)
92+
LIMIT 1)
93+
END
94+
ELSE
95+
-- Non-map steps: current behavior
96+
jsonb_build_object('run', r.input) ||
97+
coalesce(dep_out.deps_output, '{}'::jsonb)
98+
END as input,
99+
st.message_id as msg_id
100+
from tasks st
101+
join runs r on st.run_id = r.run_id
102+
join pgflow.steps step on
103+
step.flow_slug = st.flow_slug and
104+
step.step_slug = st.step_slug
105+
left join deps_outputs dep_out on
106+
dep_out.run_id = st.run_id and
107+
dep_out.step_slug = st.step_slug
108+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
1+
h1:Afano+yWWpJ0L1p6Xdb9ndh7ARUpCWBBvkOK4f7dAXI=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -13,3 +13,4 @@ h1:iTSgSZ3IR12NmZGRVc2bayttSSUytFA3+bximV7hb2U=
1313
20250912125339_pgflow_TEMP_task_spawning_optimization.sql h1:HTSShQweuTS1Sz5q/KLy5XW3J/6D/mA6jjVpCfvjBto=
1414
20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc=
1515
20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y=
16+
20250916194715_pgflow_temp_handle_arrays_in_start_tasks.sql h1:yvdKz3HhQ28pkyK/RE+MVNuWYxkTtDV1xzGy57rjMg4=
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
begin;
2+
select plan(7);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Dependent map tasks receive individual array elements from predecessor
6+
select diag('Testing dependent map tasks receive elements from predecessor output');
7+
8+
-- SETUP: Create flow with single step -> map step
9+
select pgflow.create_flow('dep_map_flow');
10+
select pgflow.add_step(
11+
flow_slug => 'dep_map_flow',
12+
step_slug => 'producer_step',
13+
deps_slugs => '{}',
14+
step_type => 'single'
15+
);
16+
select pgflow.add_step(
17+
flow_slug => 'dep_map_flow',
18+
step_slug => 'map_consumer',
19+
deps_slugs => ARRAY['producer_step'],
20+
step_type => 'map'
21+
);
22+
23+
-- Start flow with some input
24+
select run_id from pgflow.start_flow('dep_map_flow', '{"initial": "data"}'::jsonb) \gset
25+
26+
-- Verify producer step was created and has a task
27+
select is(
28+
(select count(*) from pgflow.step_tasks
29+
where run_id = :'run_id' and step_slug = 'producer_step'),
30+
1::bigint,
31+
'Producer step should have 1 task'
32+
);
33+
34+
-- Ensure worker exists
35+
select pgflow_tests.ensure_worker('dep_map_flow');
36+
37+
-- Start and complete the producer task with array output
38+
with producer_task as (
39+
select * from pgflow_tests.read_and_start('dep_map_flow', 1, 1) limit 1
40+
)
41+
select pgflow.complete_task(
42+
(select run_id from producer_task),
43+
'producer_step',
44+
0,
45+
'[10, 20, 30, 40]'::jsonb -- Array output from producer
46+
)
47+
from producer_task;
48+
49+
-- Verify producer step is completed
50+
select is(
51+
(select status from pgflow.step_states
52+
where run_id = :'run_id' and step_slug = 'producer_step'),
53+
'completed',
54+
'Producer step should be completed'
55+
);
56+
57+
-- Verify map_consumer initial_tasks was set to 4
58+
select is(
59+
(select initial_tasks from pgflow.step_states
60+
where run_id = :'run_id' and step_slug = 'map_consumer'),
61+
4,
62+
'Map consumer should have initial_tasks = 4 (array length)'
63+
);
64+
65+
-- Verify 4 tasks were created for map_consumer
66+
select is(
67+
(select count(*) from pgflow.step_tasks
68+
where run_id = :'run_id' and step_slug = 'map_consumer'),
69+
4::bigint,
70+
'Should create 4 tasks for map step'
71+
);
72+
73+
-- Get message IDs for each map task
74+
select message_id as msg_id_0 from pgflow.step_tasks
75+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 0 \gset
76+
77+
select message_id as msg_id_1 from pgflow.step_tasks
78+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 1 \gset
79+
80+
select message_id as msg_id_2 from pgflow.step_tasks
81+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 2 \gset
82+
83+
select message_id as msg_id_3 from pgflow.step_tasks
84+
where run_id = :'run_id' and step_slug = 'map_consumer' and task_index = 3 \gset
85+
86+
-- TEST: Each map task receives its specific element from producer output
87+
select is(
88+
(select input->'producer_step' from pgflow.start_tasks(
89+
'dep_map_flow',
90+
ARRAY[:'msg_id_0'::bigint],
91+
'11111111-1111-1111-1111-111111111111'::uuid
92+
)),
93+
'10'::jsonb,
94+
'Task 0 should receive first element (10) from producer_step'
95+
);
96+
97+
select is(
98+
(select input->'producer_step' from pgflow.start_tasks(
99+
'dep_map_flow',
100+
ARRAY[:'msg_id_1'::bigint],
101+
'11111111-1111-1111-1111-111111111111'::uuid
102+
)),
103+
'20'::jsonb,
104+
'Task 1 should receive second element (20) from producer_step'
105+
);
106+
107+
select is(
108+
(select input->'producer_step' from pgflow.start_tasks(
109+
'dep_map_flow',
110+
ARRAY[:'msg_id_3'::bigint],
111+
'11111111-1111-1111-1111-111111111111'::uuid
112+
)),
113+
'40'::jsonb,
114+
'Task 3 should receive fourth element (40) from producer_step'
115+
);
116+
117+
select finish();
118+
rollback;
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
begin;
2+
select plan(6);
3+
select pgflow_tests.reset_db();
4+
5+
-- Test: Map tasks correctly handle large arrays (100+ elements)
6+
select diag('Testing map tasks with large arrays');
7+
8+
-- SETUP: Create flow with root map
9+
select pgflow.create_flow('large_array_flow');
10+
select pgflow.add_step(
11+
flow_slug => 'large_array_flow',
12+
step_slug => 'large_map',
13+
deps_slugs => '{}',
14+
step_type => 'map'
15+
);
16+
17+
-- Start flow with array of 150 elements
18+
select run_id from pgflow.start_flow(
19+
'large_array_flow',
20+
(select jsonb_agg(i) from generate_series(1, 150) i)
21+
) \gset
22+
23+
-- Verify 150 tasks were created
24+
select is(
25+
(select count(*) from pgflow.step_tasks
26+
where run_id = :'run_id' and step_slug = 'large_map'),
27+
150::bigint,
28+
'Should create 150 tasks for array with 150 elements'
29+
);
30+
31+
-- Ensure worker exists
32+
select pgflow_tests.ensure_worker('large_array_flow');
33+
34+
-- Sample check: verify specific indices receive correct elements
35+
-- Check index 0 (first)
36+
select message_id as msg_id_0 from pgflow.step_tasks
37+
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 0 \gset
38+
39+
select is(
40+
(select input->'run' from pgflow.start_tasks(
41+
'large_array_flow',
42+
ARRAY[:'msg_id_0'::bigint],
43+
'11111111-1111-1111-1111-111111111111'::uuid
44+
)),
45+
'1'::jsonb,
46+
'Task at index 0 should receive element 1'
47+
);
48+
49+
-- Check index 49 (middle)
50+
select message_id as msg_id_49 from pgflow.step_tasks
51+
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 49 \gset
52+
53+
select is(
54+
(select input->'run' from pgflow.start_tasks(
55+
'large_array_flow',
56+
ARRAY[:'msg_id_49'::bigint],
57+
'11111111-1111-1111-1111-111111111111'::uuid
58+
)),
59+
'50'::jsonb,
60+
'Task at index 49 should receive element 50'
61+
);
62+
63+
-- Check index 99
64+
select message_id as msg_id_99 from pgflow.step_tasks
65+
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 99 \gset
66+
67+
select is(
68+
(select input->'run' from pgflow.start_tasks(
69+
'large_array_flow',
70+
ARRAY[:'msg_id_99'::bigint],
71+
'11111111-1111-1111-1111-111111111111'::uuid
72+
)),
73+
'100'::jsonb,
74+
'Task at index 99 should receive element 100'
75+
);
76+
77+
-- Check index 149 (last)
78+
select message_id as msg_id_149 from pgflow.step_tasks
79+
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 149 \gset
80+
81+
select is(
82+
(select input->'run' from pgflow.start_tasks(
83+
'large_array_flow',
84+
ARRAY[:'msg_id_149'::bigint],
85+
'11111111-1111-1111-1111-111111111111'::uuid
86+
)),
87+
'150'::jsonb,
88+
'Task at index 149 should receive element 150'
89+
);
90+
91+
-- Verify all task indices are sequential from 0 to 149
92+
select is(
93+
(select array_agg(task_index order by task_index) = array_agg(generate_series order by generate_series)
94+
from pgflow.step_tasks st
95+
cross join generate_series(0, 149)
96+
where st.run_id = :'run_id' and st.step_slug = 'large_map'),
97+
true,
98+
'All task indices should be sequential from 0 to 149'
99+
);
100+
101+
select finish();
102+
rollback;

0 commit comments

Comments
 (0)