Skip to content

Commit c2a2e38

Browse files
committed
feat: add task_index attribute to step_task_record and start_tasks function
- Updated the step_task_record type to include task_index - Modified start_tasks function to return task_index alongside msg_id - Adjusted database schema and TypeScript types to support task_index - Enhanced input construction logic for map and non-map steps based on task_index - Added migration to modify start_tasks function and type to handle task_index - Updated test to complete a task with the new task_index attribute
1 parent bcbe933 commit c2a2e38

12 files changed

+311
-39
lines changed

pkgs/core/schemas/0040_types.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ create type pgflow.step_task_record as (
44
run_id uuid,
55
step_slug text,
66
input jsonb,
7-
msg_id bigint
7+
msg_id bigint,
8+
task_index int
89
);

pkgs/core/schemas/0120_function_start_tasks.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ as $$
2525
),
2626
start_tasks_update as (
2727
update pgflow.step_tasks
28-
set
28+
set
2929
attempts_count = attempts_count + 1,
3030
status = 'started',
3131
started_at = now(),
@@ -171,7 +171,8 @@ as $$
171171
-- If no dependencies, defaults to empty object
172172
coalesce(dep_out.deps_output, '{}'::jsonb)
173173
END as input,
174-
st.message_id as msg_id
174+
st.message_id as msg_id,
175+
st.task_index as task_index
175176
from tasks st
176177
join runs r on st.run_id = r.run_id
177178
join pgflow.steps step on

pkgs/core/src/database-types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ export type Database = {
515515
step_slug: string | null
516516
input: Json | null
517517
msg_id: number | null
518+
task_index: number | null
518519
}
519520
}
520521
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
-- Modify "step_task_record" composite type
2+
ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "task_index" integer;
3+
-- Modify "start_tasks" function
4+
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
5+
with tasks as (
6+
select
7+
task.flow_slug,
8+
task.run_id,
9+
task.step_slug,
10+
task.task_index,
11+
task.message_id
12+
from pgflow.step_tasks as task
13+
join pgflow.runs r on r.run_id = task.run_id
14+
where task.flow_slug = start_tasks.flow_slug
15+
and task.message_id = any(msg_ids)
16+
and task.status = 'queued'
17+
-- MVP: Don't start tasks on failed runs
18+
and r.status != 'failed'
19+
),
20+
start_tasks_update as (
21+
update pgflow.step_tasks
22+
set
23+
attempts_count = attempts_count + 1,
24+
status = 'started',
25+
started_at = now(),
26+
last_worker_id = worker_id
27+
from tasks
28+
where step_tasks.message_id = tasks.message_id
29+
and step_tasks.flow_slug = tasks.flow_slug
30+
and step_tasks.status = 'queued'
31+
),
32+
runs as (
33+
select
34+
r.run_id,
35+
r.input
36+
from pgflow.runs r
37+
where r.run_id in (select run_id from tasks)
38+
),
39+
deps as (
40+
select
41+
st.run_id,
42+
st.step_slug,
43+
dep.dep_slug,
44+
-- Aggregate map outputs or use single output
45+
CASE
46+
WHEN dep_step.step_type = 'map' THEN
47+
-- Aggregate all task outputs ordered by task_index
48+
-- Use COALESCE to return empty array if no tasks
49+
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
50+
FROM pgflow.step_tasks dt
51+
WHERE dt.run_id = st.run_id
52+
AND dt.step_slug = dep.dep_slug
53+
AND dt.status = 'completed')
54+
ELSE
55+
-- Single step: use the single task output
56+
dep_task.output
57+
END as dep_output
58+
from tasks st
59+
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
60+
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
61+
left join pgflow.step_tasks dep_task on
62+
dep_task.run_id = st.run_id and
63+
dep_task.step_slug = dep.dep_slug and
64+
dep_task.status = 'completed'
65+
and dep_step.step_type = 'single' -- Only join for single steps
66+
),
67+
deps_outputs as (
68+
select
69+
d.run_id,
70+
d.step_slug,
71+
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
72+
count(*) as dep_count
73+
from deps d
74+
group by d.run_id, d.step_slug
75+
),
76+
timeouts as (
77+
select
78+
task.message_id,
79+
task.flow_slug,
80+
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
81+
from tasks task
82+
join pgflow.flows flow on flow.flow_slug = task.flow_slug
83+
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
84+
),
85+
-- Batch update visibility timeouts for all messages
86+
set_vt_batch as (
87+
select pgflow.set_vt_batch(
88+
start_tasks.flow_slug,
89+
array_agg(t.message_id order by t.message_id),
90+
array_agg(t.vt_delay order by t.message_id)
91+
)
92+
from timeouts t
93+
)
94+
select
95+
st.flow_slug,
96+
st.run_id,
97+
st.step_slug,
98+
-- ==========================================
99+
-- INPUT CONSTRUCTION LOGIC
100+
-- ==========================================
101+
-- This nested CASE statement determines how to construct the input
102+
-- for each task based on the step type (map vs non-map).
103+
--
104+
-- The fundamental difference:
105+
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
106+
-- - Non-map steps: Receive structured objects with named keys
107+
-- (e.g., {"run": {...}, "dependency1": {...}})
108+
-- ==========================================
109+
CASE
110+
-- -------------------- MAP STEPS --------------------
111+
-- Map steps process arrays element-by-element.
112+
-- Each task receives ONE element from the array at its task_index position.
113+
WHEN step.step_type = 'map' THEN
114+
-- Map steps get raw array elements without any wrapper object
115+
CASE
116+
-- ROOT MAP: Gets array from run input
117+
-- Example: run input = [1, 2, 3]
118+
-- task 0 gets: 1
119+
-- task 1 gets: 2
120+
-- task 2 gets: 3
121+
WHEN step.deps_count = 0 THEN
122+
-- Root map (deps_count = 0): no dependencies, reads from run input.
123+
-- Extract the element at task_index from the run's input array.
124+
-- Note: If run input is not an array, this will return NULL
125+
-- and the flow will fail (validated in start_flow).
126+
jsonb_array_element(r.input, st.task_index)
127+
128+
-- DEPENDENT MAP: Gets array from its single dependency
129+
-- Example: dependency output = ["a", "b", "c"]
130+
-- task 0 gets: "a"
131+
-- task 1 gets: "b"
132+
-- task 2 gets: "c"
133+
ELSE
134+
-- Has dependencies (should be exactly 1 for map steps).
135+
-- Extract the element at task_index from the dependency's output array.
136+
--
137+
-- Why the subquery with jsonb_each?
138+
-- - The dependency outputs a raw array: [1, 2, 3]
139+
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
140+
-- - We need to unwrap and get just the array value
141+
-- - Map steps have exactly 1 dependency (enforced by add_step)
142+
-- - So jsonb_each will return exactly 1 row
143+
-- - We extract the 'value' which is the raw array [1, 2, 3]
144+
-- - Then get the element at task_index from that array
145+
(SELECT jsonb_array_element(value, st.task_index)
146+
FROM jsonb_each(dep_out.deps_output)
147+
LIMIT 1)
148+
END
149+
150+
-- -------------------- NON-MAP STEPS --------------------
151+
-- Regular (non-map) steps receive ALL inputs as a structured object.
152+
-- This includes the original run input plus all dependency outputs.
153+
ELSE
154+
-- Non-map steps get structured input with named keys
155+
-- Example output: {
156+
-- "run": {"original": "input"},
157+
-- "step1": {"output": "from_step1"},
158+
-- "step2": {"output": "from_step2"}
159+
-- }
160+
--
161+
-- Build object with 'run' key containing original input
162+
jsonb_build_object('run', r.input) ||
163+
-- Merge with deps_output which already has dependency outputs
164+
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
165+
-- If no dependencies, defaults to empty object
166+
coalesce(dep_out.deps_output, '{}'::jsonb)
167+
END as input,
168+
st.message_id as msg_id,
169+
st.task_index as task_index
170+
from tasks st
171+
join runs r on st.run_id = r.run_id
172+
join pgflow.steps step on
173+
step.flow_slug = st.flow_slug and
174+
step.step_slug = st.step_slug
175+
left join deps_outputs dep_out on
176+
dep_out.run_id = st.run_id and
177+
dep_out.step_slug = st.step_slug
178+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg=
1+
h1:46a22RkBGrdfb3veJG3ZlyUkS3us2qfEFGn5cjh2W+Q=
22
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
@@ -16,3 +16,4 @@ h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg=
1616
20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4=
1717
20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60=
1818
20250919101802_pgflow_temp_orphaned_messages_index.sql h1:GyfPfQz4AqB1/sTAC7B/m6j8FJrpkocinnzerNfM0f8=
19+
20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql h1:DguPK41IfsMykzodXqZq0BmW1IXZW8ZTj6rkw4LaHFE=

pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,17 @@ select is(
7575
do $$
7676
declare
7777
v_run_id uuid;
78-
v_task pgflow.step_task_record;
7978
v_start_time timestamp;
8079
v_duration interval;
8180
begin
8281
select run_id into v_run_id from pgflow.runs limit 1;
8382

84-
-- Get consumer task
85-
select * into v_task from pgflow.step_tasks
86-
where run_id = v_run_id and step_slug = 'consumer' and status = 'started';
87-
88-
-- Complete it
83+
-- The consumer task was already started in the previous test assertion (line 69)
84+
-- We just need to complete it
8985
perform pgflow.complete_task(
90-
v_task.run_id,
91-
v_task.step_slug,
92-
0,
86+
v_run_id,
87+
'consumer',
88+
0, -- consumer is a single task, so task_index is 0
9389
jsonb_build_object('processed', 100)
9490
);
9591

pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(6);
2+
select plan(7);
33
select pgflow_tests.reset_db();
44

55
select pgflow.create_flow('simple');
@@ -9,24 +9,34 @@ select pgflow.start_flow('simple', '"hello"'::jsonb);
99
-- Ensure worker exists
1010
select pgflow_tests.ensure_worker('simple');
1111

12-
-- Read messages from queue
12+
-- Read messages from queue and start task
1313
with msgs as (
1414
select * from pgflow.read_with_poll('simple', 10, 5, 1, 50) limit 1
1515
),
1616
msg_ids as (
1717
select array_agg(msg_id) as ids from msgs
18-
)
19-
-- TEST: start_tasks returns tasks for valid message IDs
20-
select is(
21-
(select count(*)::int from pgflow.start_tasks(
18+
),
19+
started_tasks as (
20+
select * from pgflow.start_tasks(
2221
'simple',
2322
(select ids from msg_ids),
2423
'11111111-1111-1111-1111-111111111111'::uuid
25-
)),
24+
)
25+
)
26+
-- TEST: start_tasks returns tasks for valid message IDs
27+
select is(
28+
(select count(*)::int from started_tasks),
2629
1,
2730
'start_tasks should return one task for valid message ID'
2831
);
2932

33+
-- TEST: Task has task_index = 0 (checking from step_tasks table since already started)
34+
select is(
35+
(select task_index from pgflow.step_tasks where step_slug = 'task'),
36+
0,
37+
'Single task should have task_index = 0'
38+
);
39+
3040
-- TEST: Task status should be 'started' after start_tasks
3141
select is(
3242
(select status from pgflow.step_tasks where step_slug = 'task'),

pkgs/core/supabase/tests/start_tasks/map_large_array.test.sql

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ select message_id as msg_id_0 from pgflow.step_tasks
3737
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 0 \gset
3838

3939
select is(
40-
(select input from pgflow.start_tasks(
40+
(select row(input, task_index) from pgflow.start_tasks(
4141
'large_array_flow',
4242
ARRAY[:'msg_id_0'::bigint],
4343
'11111111-1111-1111-1111-111111111111'::uuid
4444
)),
45-
'1'::jsonb,
46-
'Task at index 0 should receive element 1'
45+
row('1'::jsonb, 0),
46+
'Task at index 0 should receive element 1 with task_index = 0'
4747
);
4848

4949
-- Check index 49 (middle)
@@ -79,13 +79,13 @@ select message_id as msg_id_149 from pgflow.step_tasks
7979
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 149 \gset
8080

8181
select is(
82-
(select input from pgflow.start_tasks(
82+
(select row(input, task_index) from pgflow.start_tasks(
8383
'large_array_flow',
8484
ARRAY[:'msg_id_149'::bigint],
8585
'11111111-1111-1111-1111-111111111111'::uuid
8686
)),
87-
'150'::jsonb,
88-
'Task at index 149 should receive element 150'
87+
row('150'::jsonb, 149),
88+
'Task at index 149 should receive element 150 with task_index = 149'
8989
);
9090

9191
-- Verify all task indices are sequential from 0 to 149

pkgs/core/supabase/tests/start_tasks/map_to_map_chain.test.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ select is(
5757
'First map task 0 should receive element 1'
5858
);
5959

60+
6061
select is(
6162
(select input from pgflow.start_tasks(
6263
'map_chain_flow',
@@ -67,6 +68,7 @@ select is(
6768
'First map task 1 should receive element 2'
6869
);
6970

71+
7072
-- NOTE: Can't complete first map tasks with non-array output because
7173
-- complete_task validates that map steps must output arrays
7274
-- This test will need to be updated once output aggregation is implemented

pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(4);
2+
select plan(5);
33
select pgflow_tests.reset_db();
44

55
select pgflow.create_flow('multi_flow');
@@ -52,5 +52,12 @@ select is(
5252
'All tasks should have same worker ID'
5353
);
5454

55+
-- TEST: All single tasks should have task_index = 0
56+
select ok(
57+
(select bool_and(task_index = 0) from pgflow.step_tasks
58+
where flow_slug = 'multi_flow'),
59+
'All single tasks should have task_index = 0'
60+
);
61+
5562
select finish();
5663
rollback;

0 commit comments

Comments
 (0)