From 85630ea13749256b57ffbe531fdd99ed18ad27b3 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Fri, 19 Sep 2025 16:06:49 +0200 Subject: [PATCH] feat: add task_index attribute to step_task_record and start_tasks function - Updated the step_task_record type to include task_index - Modified start_tasks function to return task_index alongside msg_id - Adjusted database schema and TypeScript types to support task_index - Enhanced input construction logic for map and non-map steps based on task_index - Added migration to modify start_tasks function and type to handle task_index - Updated test to complete a task with the new task_index attribute --- pkgs/core/schemas/0040_types.sql | 3 +- .../schemas/0120_function_start_tasks.sql | 5 +- pkgs/core/src/database-types.ts | 1 + ..._temp_return_task_index_in_start_tasks.sql | 178 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../large_array_performance.test.sql | 14 +- .../start_tasks/basic_start_tasks.test.sql | 24 ++- .../start_tasks/map_large_array.test.sql | 12 +- .../start_tasks/map_to_map_chain.test.sql | 2 + .../multiple_task_processing.test.sql | 9 +- .../root_map_element_extraction.test.sql | 24 +-- .../task_index_returned_correctly.test.sql | 75 ++++++++ 12 files changed, 311 insertions(+), 39 deletions(-) create mode 100644 pkgs/core/supabase/migrations/20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql create mode 100644 pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql diff --git a/pkgs/core/schemas/0040_types.sql b/pkgs/core/schemas/0040_types.sql index 05ffc7a8a..1bd799fc4 100644 --- a/pkgs/core/schemas/0040_types.sql +++ b/pkgs/core/schemas/0040_types.sql @@ -4,5 +4,6 @@ create type pgflow.step_task_record as ( run_id uuid, step_slug text, input jsonb, - msg_id bigint + msg_id bigint, + task_index int ); diff --git a/pkgs/core/schemas/0120_function_start_tasks.sql b/pkgs/core/schemas/0120_function_start_tasks.sql index cd65d2844..3824bad78 100644 --- a/pkgs/core/schemas/0120_function_start_tasks.sql +++ b/pkgs/core/schemas/0120_function_start_tasks.sql @@ -25,7 +25,7 @@ as $$ ), start_tasks_update as ( update pgflow.step_tasks - set + set attempts_count = attempts_count + 1, status = 'started', started_at = now(), @@ -171,7 +171,8 @@ as $$ -- If no dependencies, defaults to empty object coalesce(dep_out.deps_output, '{}'::jsonb) END as input, - st.message_id as msg_id + st.message_id as msg_id, + st.task_index as task_index from tasks st join runs r on st.run_id = r.run_id join pgflow.steps step on diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 27429b7bd..d5a7c13ef 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -515,6 +515,7 @@ export type Database = { step_slug: string | null input: Json | null msg_id: number | null + task_index: number | null } } } diff --git a/pkgs/core/supabase/migrations/20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql b/pkgs/core/supabase/migrations/20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql new file mode 100644 index 000000000..128cede04 --- /dev/null +++ b/pkgs/core/supabase/migrations/20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql @@ -0,0 +1,178 @@ +-- Modify "step_task_record" composite type +ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "task_index" integer; +-- Modify "start_tasks" function +CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ +with tasks as ( + select + task.flow_slug, + task.run_id, + task.step_slug, + task.task_index, + task.message_id + from pgflow.step_tasks as task + join pgflow.runs r on r.run_id = task.run_id + where task.flow_slug = start_tasks.flow_slug + and task.message_id = any(msg_ids) + and task.status = 'queued' + -- MVP: Don't start tasks on failed runs + and r.status != 'failed' + ), + start_tasks_update as ( + update pgflow.step_tasks + set + attempts_count = attempts_count + 1, + status = 'started', + started_at = now(), + last_worker_id = worker_id + from tasks + where step_tasks.message_id = tasks.message_id + and step_tasks.flow_slug = tasks.flow_slug + and step_tasks.status = 'queued' + ), + runs as ( + select + r.run_id, + r.input + from pgflow.runs r + where r.run_id in (select run_id from tasks) + ), + deps as ( + select + st.run_id, + st.step_slug, + dep.dep_slug, + -- Aggregate map outputs or use single output + CASE + WHEN dep_step.step_type = 'map' THEN + -- Aggregate all task outputs ordered by task_index + -- Use COALESCE to return empty array if no tasks + (SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb) + FROM pgflow.step_tasks dt + WHERE dt.run_id = st.run_id + AND dt.step_slug = dep.dep_slug + AND dt.status = 'completed') + ELSE + -- Single step: use the single task output + dep_task.output + END as dep_output + from tasks st + join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug + join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug + left join pgflow.step_tasks dep_task on + dep_task.run_id = st.run_id and + dep_task.step_slug = dep.dep_slug and + dep_task.status = 'completed' + and dep_step.step_type = 'single' -- Only join for single steps + ), + deps_outputs as ( + select + d.run_id, + d.step_slug, + jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, + count(*) as dep_count + from deps d + group by d.run_id, d.step_slug + ), + timeouts as ( + select + task.message_id, + task.flow_slug, + coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay + from tasks task + join pgflow.flows flow on flow.flow_slug = task.flow_slug + join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug + ), + -- Batch update visibility timeouts for all messages + set_vt_batch as ( + select pgflow.set_vt_batch( + start_tasks.flow_slug, + array_agg(t.message_id order by t.message_id), + array_agg(t.vt_delay order by t.message_id) + ) + from timeouts t + ) + select + st.flow_slug, + st.run_id, + st.step_slug, + -- ========================================== + -- INPUT CONSTRUCTION LOGIC + -- ========================================== + -- This nested CASE statement determines how to construct the input + -- for each task based on the step type (map vs non-map). + -- + -- The fundamental difference: + -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") + -- - Non-map steps: Receive structured objects with named keys + -- (e.g., {"run": {...}, "dependency1": {...}}) + -- ========================================== + CASE + -- -------------------- MAP STEPS -------------------- + -- Map steps process arrays element-by-element. + -- Each task receives ONE element from the array at its task_index position. + WHEN step.step_type = 'map' THEN + -- Map steps get raw array elements without any wrapper object + CASE + -- ROOT MAP: Gets array from run input + -- Example: run input = [1, 2, 3] + -- task 0 gets: 1 + -- task 1 gets: 2 + -- task 2 gets: 3 + WHEN step.deps_count = 0 THEN + -- Root map (deps_count = 0): no dependencies, reads from run input. + -- Extract the element at task_index from the run's input array. + -- Note: If run input is not an array, this will return NULL + -- and the flow will fail (validated in start_flow). + jsonb_array_element(r.input, st.task_index) + + -- DEPENDENT MAP: Gets array from its single dependency + -- Example: dependency output = ["a", "b", "c"] + -- task 0 gets: "a" + -- task 1 gets: "b" + -- task 2 gets: "c" + ELSE + -- Has dependencies (should be exactly 1 for map steps). + -- Extract the element at task_index from the dependency's output array. + -- + -- Why the subquery with jsonb_each? + -- - The dependency outputs a raw array: [1, 2, 3] + -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} + -- - We need to unwrap and get just the array value + -- - Map steps have exactly 1 dependency (enforced by add_step) + -- - So jsonb_each will return exactly 1 row + -- - We extract the 'value' which is the raw array [1, 2, 3] + -- - Then get the element at task_index from that array + (SELECT jsonb_array_element(value, st.task_index) + FROM jsonb_each(dep_out.deps_output) + LIMIT 1) + END + + -- -------------------- NON-MAP STEPS -------------------- + -- Regular (non-map) steps receive ALL inputs as a structured object. + -- This includes the original run input plus all dependency outputs. + ELSE + -- Non-map steps get structured input with named keys + -- Example output: { + -- "run": {"original": "input"}, + -- "step1": {"output": "from_step1"}, + -- "step2": {"output": "from_step2"} + -- } + -- + -- Build object with 'run' key containing original input + jsonb_build_object('run', r.input) || + -- Merge with deps_output which already has dependency outputs + -- deps_output format: {"dep1": output1, "dep2": output2, ...} + -- If no dependencies, defaults to empty object + coalesce(dep_out.deps_output, '{}'::jsonb) + END as input, + st.message_id as msg_id, + st.task_index as task_index + from tasks st + join runs r on st.run_id = r.run_id + join pgflow.steps step on + step.flow_slug = st.flow_slug and + step.step_slug = st.step_slug + left join deps_outputs dep_out on + dep_out.run_id = st.run_id and + dep_out.step_slug = st.step_slug +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 229f7ae81..ea50bbf85 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg= +h1:46a22RkBGrdfb3veJG3ZlyUkS3us2qfEFGn5cjh2W+Q= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= @@ -16,3 +16,4 @@ h1:BLyxfG244req3FS+uKAgPCkWU4PQxQvDHckN/qLK6mg= 20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4= 20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60= 20250919101802_pgflow_temp_orphaned_messages_index.sql h1:GyfPfQz4AqB1/sTAC7B/m6j8FJrpkocinnzerNfM0f8= +20250919135211_pgflow_temp_return_task_index_in_start_tasks.sql h1:DguPK41IfsMykzodXqZq0BmW1IXZW8ZTj6rkw4LaHFE= diff --git a/pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql index e060dae95..d902476df 100644 --- a/pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql +++ b/pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql @@ -75,21 +75,17 @@ select is( do $$ declare v_run_id uuid; - v_task pgflow.step_task_record; v_start_time timestamp; v_duration interval; begin select run_id into v_run_id from pgflow.runs limit 1; - -- Get consumer task - select * into v_task from pgflow.step_tasks - where run_id = v_run_id and step_slug = 'consumer' and status = 'started'; - - -- Complete it + -- The consumer task was already started in the previous test assertion (line 69) + -- We just need to complete it perform pgflow.complete_task( - v_task.run_id, - v_task.step_slug, - 0, + v_run_id, + 'consumer', + 0, -- consumer is a single task, so task_index is 0 jsonb_build_object('processed', 100) ); diff --git a/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql b/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql index aff3966b9..dfa02d5df 100644 --- a/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/basic_start_tasks.test.sql @@ -1,5 +1,5 @@ begin; -select plan(6); +select plan(7); select pgflow_tests.reset_db(); select pgflow.create_flow('simple'); @@ -9,24 +9,34 @@ select pgflow.start_flow('simple', '"hello"'::jsonb); -- Ensure worker exists select pgflow_tests.ensure_worker('simple'); --- Read messages from queue +-- Read messages from queue and start task with msgs as ( select * from pgflow.read_with_poll('simple', 10, 5, 1, 50) limit 1 ), msg_ids as ( select array_agg(msg_id) as ids from msgs -) --- TEST: start_tasks returns tasks for valid message IDs -select is( - (select count(*)::int from pgflow.start_tasks( +), +started_tasks as ( + select * from pgflow.start_tasks( 'simple', (select ids from msg_ids), '11111111-1111-1111-1111-111111111111'::uuid - )), + ) +) +-- TEST: start_tasks returns tasks for valid message IDs +select is( + (select count(*)::int from started_tasks), 1, 'start_tasks should return one task for valid message ID' ); +-- TEST: Task has task_index = 0 (checking from step_tasks table since already started) +select is( + (select task_index from pgflow.step_tasks where step_slug = 'task'), + 0, + 'Single task should have task_index = 0' +); + -- TEST: Task status should be 'started' after start_tasks select is( (select status from pgflow.step_tasks where step_slug = 'task'), diff --git a/pkgs/core/supabase/tests/start_tasks/map_large_array.test.sql b/pkgs/core/supabase/tests/start_tasks/map_large_array.test.sql index b24045d53..4e59af75c 100644 --- a/pkgs/core/supabase/tests/start_tasks/map_large_array.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/map_large_array.test.sql @@ -37,13 +37,13 @@ select message_id as msg_id_0 from pgflow.step_tasks where run_id = :'run_id' and step_slug = 'large_map' and task_index = 0 \gset select is( - (select input from pgflow.start_tasks( + (select row(input, task_index) from pgflow.start_tasks( 'large_array_flow', ARRAY[:'msg_id_0'::bigint], '11111111-1111-1111-1111-111111111111'::uuid )), - '1'::jsonb, - 'Task at index 0 should receive element 1' + row('1'::jsonb, 0), + 'Task at index 0 should receive element 1 with task_index = 0' ); -- Check index 49 (middle) @@ -79,13 +79,13 @@ select message_id as msg_id_149 from pgflow.step_tasks where run_id = :'run_id' and step_slug = 'large_map' and task_index = 149 \gset select is( - (select input from pgflow.start_tasks( + (select row(input, task_index) from pgflow.start_tasks( 'large_array_flow', ARRAY[:'msg_id_149'::bigint], '11111111-1111-1111-1111-111111111111'::uuid )), - '150'::jsonb, - 'Task at index 149 should receive element 150' + row('150'::jsonb, 149), + 'Task at index 149 should receive element 150 with task_index = 149' ); -- Verify all task indices are sequential from 0 to 149 diff --git a/pkgs/core/supabase/tests/start_tasks/map_to_map_chain.test.sql b/pkgs/core/supabase/tests/start_tasks/map_to_map_chain.test.sql index d4cf2d9e9..4b397dad5 100644 --- a/pkgs/core/supabase/tests/start_tasks/map_to_map_chain.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/map_to_map_chain.test.sql @@ -57,6 +57,7 @@ select is( 'First map task 0 should receive element 1' ); + select is( (select input from pgflow.start_tasks( 'map_chain_flow', @@ -67,6 +68,7 @@ select is( 'First map task 1 should receive element 2' ); + -- NOTE: Can't complete first map tasks with non-array output because -- complete_task validates that map steps must output arrays -- This test will need to be updated once output aggregation is implemented diff --git a/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql b/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql index 9ece1f91a..cd2ff6fde 100644 --- a/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/multiple_task_processing.test.sql @@ -1,5 +1,5 @@ begin; -select plan(4); +select plan(5); select pgflow_tests.reset_db(); select pgflow.create_flow('multi_flow'); @@ -52,5 +52,12 @@ select is( 'All tasks should have same worker ID' ); +-- TEST: All single tasks should have task_index = 0 +select ok( + (select bool_and(task_index = 0) from pgflow.step_tasks + where flow_slug = 'multi_flow'), + 'All single tasks should have task_index = 0' +); + select finish(); rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/start_tasks/root_map_element_extraction.test.sql b/pkgs/core/supabase/tests/start_tasks/root_map_element_extraction.test.sql index a551dd0b8..31461a9e7 100644 --- a/pkgs/core/supabase/tests/start_tasks/root_map_element_extraction.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/root_map_element_extraction.test.sql @@ -43,37 +43,37 @@ where run_id = :'run_id' and step_slug = 'root_map' and task_index = 1 \gset select message_id as msg_id_2 from pgflow.step_tasks where run_id = :'run_id' and step_slug = 'root_map' and task_index = 2 \gset --- TEST: Call start_tasks for task 0 and verify it receives "apple" +-- TEST: Call start_tasks for task 0 and verify input and task_index select is( - (select input from pgflow.start_tasks( + (select row(input, task_index) from pgflow.start_tasks( 'root_map_flow', ARRAY[:'msg_id_0'::bigint], '11111111-1111-1111-1111-111111111111'::uuid )), - '"apple"'::jsonb, - 'Task 0 should receive first array element (apple)' + row('"apple"'::jsonb, 0), + 'Task 0 should receive first array element (apple) with task_index = 0' ); --- TEST: Call start_tasks for task 1 and verify it receives "banana" +-- TEST: Call start_tasks for task 1 and verify input and task_index select is( - (select input from pgflow.start_tasks( + (select row(input, task_index) from pgflow.start_tasks( 'root_map_flow', ARRAY[:'msg_id_1'::bigint], '11111111-1111-1111-1111-111111111111'::uuid )), - '"banana"'::jsonb, - 'Task 1 should receive second array element (banana)' + row('"banana"'::jsonb, 1), + 'Task 1 should receive second array element (banana) with task_index = 1' ); --- TEST: Call start_tasks for task 2 and verify it receives "cherry" +-- TEST: Call start_tasks for task 2 and verify input and task_index select is( - (select input from pgflow.start_tasks( + (select row(input, task_index) from pgflow.start_tasks( 'root_map_flow', ARRAY[:'msg_id_2'::bigint], '11111111-1111-1111-1111-111111111111'::uuid )), - '"cherry"'::jsonb, - 'Task 2 should receive third array element (cherry)' + row('"cherry"'::jsonb, 2), + 'Task 2 should receive third array element (cherry) with task_index = 2' ); -- Verify all tasks got started diff --git a/pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql b/pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql new file mode 100644 index 000000000..acdcfcccd --- /dev/null +++ b/pkgs/core/supabase/tests/start_tasks/task_index_returned_correctly.test.sql @@ -0,0 +1,75 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Test: start_tasks returns correct task_index field for all task types +select diag('Testing that start_tasks returns task_index correctly'); + +-- SETUP: Create flow with map and single steps +select pgflow.create_flow('test_task_index'); + +-- Add a map step that will create multiple tasks +select pgflow.add_step( + flow_slug => 'test_task_index', + step_slug => 'map_step', + deps_slugs => '{}', + step_type => 'map' +); + +-- Add a single step that depends on the map +select pgflow.add_step( + flow_slug => 'test_task_index', + step_slug => 'single_step', + deps_slugs => array['map_step'], + step_type => 'single' +); + +-- Start flow with array input to create multiple tasks for map step +select run_id from pgflow.start_flow('test_task_index', '[10, 20, 30, 40, 50]'::jsonb) \gset + +-- Verify 5 tasks were created for map step +select is( + (select count(*) from pgflow.step_tasks + where run_id = :'run_id' and step_slug = 'map_step'), + 5::bigint, + 'Should create 5 tasks for map step with 5 element array' +); + +-- Verify task_index values are correct in step_tasks table +select is( + (select array_agg(task_index order by task_index) from pgflow.step_tasks + where run_id = :'run_id' and step_slug = 'map_step'), + ARRAY[0, 1, 2, 3, 4], + 'Map step tasks should have task_index from 0 to 4' +); + +-- Ensure all workers exist for various tests +select pgflow_tests.ensure_worker('test_task_index', '11111111-1111-1111-1111-111111111111'::uuid); +select pgflow_tests.ensure_worker('test_task_index', '22222222-2222-2222-2222-222222222222'::uuid); +select pgflow_tests.ensure_worker('test_task_index', '33333333-3333-3333-3333-333333333333'::uuid); +select pgflow_tests.ensure_worker('test_task_index', '44444444-4444-4444-4444-444444444444'::uuid); +select pgflow_tests.ensure_worker('test_task_index', '55555555-5555-5555-5555-555555555555'::uuid); + +-- Read messages from queue and start tasks +with msgs as ( + select * from pgflow.read_with_poll('test_task_index', 10, 10, 1, 50) +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'test_task_index', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +-- TEST: All returned task_index values match the expected indices +select is( + (select array_agg(task_index order by task_index) from started_tasks), + ARRAY[0, 1, 2, 3, 4], + 'start_tasks should return correct task_index values for all map tasks' +); + +select finish(); +rollback;