Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkgs/core/schemas/0040_types.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ create type pgflow.step_task_record as (
run_id uuid,
step_slug text,
input jsonb,
msg_id bigint
msg_id bigint,
task_index int
);
5 changes: 3 additions & 2 deletions pkgs/core/schemas/0120_function_start_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ as $$
),
start_tasks_update as (
update pgflow.step_tasks
set
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
Expand Down Expand Up @@ -171,7 +171,8 @@ as $$
-- If no dependencies, defaults to empty object
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id
st.message_id as msg_id,
st.task_index as task_index
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
Expand Down
1 change: 1 addition & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ export type Database = {
step_slug: string | null
input: Json | null
msg_id: number | null
task_index: number | null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
-- Modify "step_task_record" composite type
ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "task_index" integer;
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join pgflow.runs r on r.run_id = task.run_id
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
-- MVP: Don't start tasks on failed runs
and r.status != 'failed'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
-- Aggregate map outputs or use single output
CASE
WHEN dep_step.step_type = 'map' THEN
-- Aggregate all task outputs ordered by task_index
-- Use COALESCE to return empty array if no tasks
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
FROM pgflow.step_tasks dt
WHERE dt.run_id = st.run_id
AND dt.step_slug = dep.dep_slug
AND dt.status = 'completed')
ELSE
-- Single step: use the single task output
dep_task.output
END as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
left join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
and dep_step.step_type = 'single' -- Only join for single steps
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)

-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END

-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive ALL inputs as a structured object.
-- This includes the original run input plus all dependency outputs.
ELSE
-- Non-map steps get structured input with named keys
-- Example output: {
-- "run": {"original": "input"},
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
--
-- Build object with 'run' key containing original input
jsonb_build_object('run', r.input) ||
-- Merge with deps_output which already has dependency outputs
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
-- If no dependencies, defaults to empty object
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg=
h1:46a22RkBGrdfb3veJG3ZlyUkS3us2qfEFGn5cjh2W+Q=
20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc=
Expand All @@ -16,3 +16,4 @@ h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg=
20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4=
20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60=
20250919101802_pgflow_temp_orphaned_messages_index.sql h1:GyfPfQz4AqB1/sTAC7B/m6j8FJrpkocinnzerNfM0f8=
20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql h1:DguPK41IfsMykzodXqZq0BmW1IXZW8ZTj6rkw4LaHFE=
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,17 @@ select is(
do $$
declare
v_run_id uuid;
v_task pgflow.step_task_record;
v_start_time timestamp;
v_duration interval;
begin
select run_id into v_run_id from pgflow.runs limit 1;

-- Get consumer task
select * into v_task from pgflow.step_tasks
where run_id = v_run_id and step_slug = 'consumer' and status = 'started';

-- Complete it
-- The consumer task was already started in the previous test assertion (line 69)
-- We just need to complete it
perform pgflow.complete_task(
v_task.run_id,
v_task.step_slug,
0,
v_run_id,
'consumer',
0, -- consumer is a single task, so task_index is 0
jsonb_build_object('processed', 100)
);

Expand Down
24 changes: 17 additions & 7 deletions pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
begin;
select plan(6);
select plan(7);
select pgflow_tests.reset_db();

select pgflow.create_flow('simple');
Expand All @@ -9,24 +9,34 @@ select pgflow.start_flow('simple', '"hello"'::jsonb);
-- Ensure worker exists
select pgflow_tests.ensure_worker('simple');

-- Read messages from queue
-- Read messages from queue and start task
with msgs as (
select * from pgflow.read_with_poll('simple', 10, 5, 1, 50) limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
)
-- TEST: start_tasks returns tasks for valid message IDs
select is(
(select count(*)::int from pgflow.start_tasks(
),
started_tasks as (
select * from pgflow.start_tasks(
'simple',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
)),
)
)
-- TEST: start_tasks returns tasks for valid message IDs
select is(
(select count(*)::int from started_tasks),
1,
'start_tasks should return one task for valid message ID'
);

-- TEST: Task has task_index = 0 (checking from step_tasks table since already started)
select is(
(select task_index from pgflow.step_tasks where step_slug = 'task'),
0,
'Single task should have task_index = 0'
);

-- TEST: Task status should be 'started' after start_tasks
select is(
(select status from pgflow.step_tasks where step_slug = 'task'),
Expand Down
12 changes: 6 additions & 6 deletions pkgs/core/supabase/tests/start_tasks/map_large_array.test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ select message_id as msg_id_0 from pgflow.step_tasks
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 0 \gset

select is(
(select input from pgflow.start_tasks(
(select row(input, task_index) from pgflow.start_tasks(
'large_array_flow',
ARRAY[:'msg_id_0'::bigint],
'11111111-1111-1111-1111-111111111111'::uuid
)),
'1'::jsonb,
'Task at index 0 should receive element 1'
row('1'::jsonb, 0),
'Task at index 0 should receive element 1 with task_index = 0'
);

-- Check index 49 (middle)
Expand Down Expand Up @@ -79,13 +79,13 @@ select message_id as msg_id_149 from pgflow.step_tasks
where run_id = :'run_id' and step_slug = 'large_map' and task_index = 149 \gset

select is(
(select input from pgflow.start_tasks(
(select row(input, task_index) from pgflow.start_tasks(
'large_array_flow',
ARRAY[:'msg_id_149'::bigint],
'11111111-1111-1111-1111-111111111111'::uuid
)),
'150'::jsonb,
'Task at index 149 should receive element 150'
row('150'::jsonb, 149),
'Task at index 149 should receive element 150 with task_index = 149'
);

-- Verify all task indices are sequential from 0 to 149
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ select is(
'First map task 0 should receive element 1'
);


select is(
(select input from pgflow.start_tasks(
'map_chain_flow',
Expand All @@ -67,6 +68,7 @@ select is(
'First map task 1 should receive element 2'
);


-- NOTE: Can't complete first map tasks with non-array output because
-- complete_task validates that map steps must output arrays
-- This test will need to be updated once output aggregation is implemented
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
begin;
select plan(4);
select plan(5);
select pgflow_tests.reset_db();

select pgflow.create_flow('multi_flow');
Expand Down Expand Up @@ -52,5 +52,12 @@ select is(
'All tasks should have same worker ID'
);

-- TEST: All single tasks should have task_index = 0
select ok(
(select bool_and(task_index = 0) from pgflow.step_tasks
where flow_slug = 'multi_flow'),
'All single tasks should have task_index = 0'
);

select finish();
rollback;
Loading