| 
 | 1 | +-- Modify "start_tasks" function  | 
 | 2 | +CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$  | 
 | 3 | +with tasks as (  | 
 | 4 | +    select  | 
 | 5 | +      task.flow_slug,  | 
 | 6 | +      task.run_id,  | 
 | 7 | +      task.step_slug,  | 
 | 8 | +      task.task_index,  | 
 | 9 | +      task.message_id  | 
 | 10 | +    from pgflow.step_tasks as task  | 
 | 11 | +    where task.flow_slug = start_tasks.flow_slug  | 
 | 12 | +      and task.message_id = any(msg_ids)  | 
 | 13 | +      and task.status = 'queued'  | 
 | 14 | +  ),  | 
 | 15 | +  start_tasks_update as (  | 
 | 16 | +    update pgflow.step_tasks  | 
 | 17 | +    set   | 
 | 18 | +      attempts_count = attempts_count + 1,  | 
 | 19 | +      status = 'started',  | 
 | 20 | +      started_at = now(),  | 
 | 21 | +      last_worker_id = worker_id  | 
 | 22 | +    from tasks  | 
 | 23 | +    where step_tasks.message_id = tasks.message_id  | 
 | 24 | +      and step_tasks.flow_slug = tasks.flow_slug  | 
 | 25 | +      and step_tasks.status = 'queued'  | 
 | 26 | +  ),  | 
 | 27 | +  runs as (  | 
 | 28 | +    select  | 
 | 29 | +      r.run_id,  | 
 | 30 | +      r.input  | 
 | 31 | +    from pgflow.runs r  | 
 | 32 | +    where r.run_id in (select run_id from tasks)  | 
 | 33 | +  ),  | 
 | 34 | +  deps as (  | 
 | 35 | +    select  | 
 | 36 | +      st.run_id,  | 
 | 37 | +      st.step_slug,  | 
 | 38 | +      dep.dep_slug,  | 
 | 39 | +      dep_task.output as dep_output  | 
 | 40 | +    from tasks st  | 
 | 41 | +    join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug  | 
 | 42 | +    join pgflow.step_tasks dep_task on  | 
 | 43 | +      dep_task.run_id = st.run_id and  | 
 | 44 | +      dep_task.step_slug = dep.dep_slug and  | 
 | 45 | +      dep_task.status = 'completed'  | 
 | 46 | +  ),  | 
 | 47 | +  deps_outputs as (  | 
 | 48 | +    select  | 
 | 49 | +      d.run_id,  | 
 | 50 | +      d.step_slug,  | 
 | 51 | +      jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,  | 
 | 52 | +      count(*) as dep_count  | 
 | 53 | +    from deps d  | 
 | 54 | +    group by d.run_id, d.step_slug  | 
 | 55 | +  ),  | 
 | 56 | +  timeouts as (  | 
 | 57 | +    select  | 
 | 58 | +      task.message_id,  | 
 | 59 | +      task.flow_slug,  | 
 | 60 | +      coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay  | 
 | 61 | +    from tasks task  | 
 | 62 | +    join pgflow.flows flow on flow.flow_slug = task.flow_slug  | 
 | 63 | +    join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug  | 
 | 64 | +  ),  | 
 | 65 | +  -- Batch update visibility timeouts for all messages  | 
 | 66 | +  set_vt_batch as (  | 
 | 67 | +    select pgflow.set_vt_batch(  | 
 | 68 | +      start_tasks.flow_slug,  | 
 | 69 | +      array_agg(t.message_id order by t.message_id),  | 
 | 70 | +      array_agg(t.vt_delay order by t.message_id)  | 
 | 71 | +    )  | 
 | 72 | +    from timeouts t  | 
 | 73 | +  )  | 
 | 74 | +  select  | 
 | 75 | +    st.flow_slug,  | 
 | 76 | +    st.run_id,  | 
 | 77 | +    st.step_slug,  | 
 | 78 | +    -- ==========================================  | 
 | 79 | +    -- INPUT CONSTRUCTION LOGIC  | 
 | 80 | +    -- ==========================================  | 
 | 81 | +    -- This nested CASE statement determines how to construct the input  | 
 | 82 | +    -- for each task based on the step type (map vs non-map).  | 
 | 83 | +    --  | 
 | 84 | +    -- The fundamental difference:  | 
 | 85 | +    -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")  | 
 | 86 | +    -- - Non-map steps: Receive structured objects with named keys  | 
 | 87 | +    --                  (e.g., {"run": {...}, "dependency1": {...}})  | 
 | 88 | +    -- ==========================================  | 
 | 89 | +    CASE  | 
 | 90 | +      -- -------------------- MAP STEPS --------------------  | 
 | 91 | +      -- Map steps process arrays element-by-element.  | 
 | 92 | +      -- Each task receives ONE element from the array at its task_index position.  | 
 | 93 | +      WHEN step.step_type = 'map' THEN  | 
 | 94 | +        -- Map steps get raw array elements without any wrapper object  | 
 | 95 | +        CASE  | 
 | 96 | +          -- ROOT MAP: Gets array from run input  | 
 | 97 | +          -- Example: run input = [1, 2, 3]  | 
 | 98 | +          --          task 0 gets: 1  | 
 | 99 | +          --          task 1 gets: 2  | 
 | 100 | +          --          task 2 gets: 3  | 
 | 101 | +          WHEN step.deps_count = 0 THEN  | 
 | 102 | +            -- Root map (deps_count = 0): no dependencies, reads from run input.  | 
 | 103 | +            -- Extract the element at task_index from the run's input array.  | 
 | 104 | +            -- Note: If run input is not an array, this will return NULL  | 
 | 105 | +            -- and the flow will fail (validated in start_flow).  | 
 | 106 | +            jsonb_array_element(r.input, st.task_index)  | 
 | 107 | + | 
 | 108 | +          -- DEPENDENT MAP: Gets array from its single dependency  | 
 | 109 | +          -- Example: dependency output = ["a", "b", "c"]  | 
 | 110 | +          --          task 0 gets: "a"  | 
 | 111 | +          --          task 1 gets: "b"  | 
 | 112 | +          --          task 2 gets: "c"  | 
 | 113 | +          ELSE  | 
 | 114 | +            -- Has dependencies (should be exactly 1 for map steps).  | 
 | 115 | +            -- Extract the element at task_index from the dependency's output array.  | 
 | 116 | +            --  | 
 | 117 | +            -- Why the subquery with jsonb_each?  | 
 | 118 | +            -- - The dependency outputs a raw array: [1, 2, 3]  | 
 | 119 | +            -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}  | 
 | 120 | +            -- - We need to unwrap and get just the array value  | 
 | 121 | +            -- - Map steps have exactly 1 dependency (enforced by add_step)  | 
 | 122 | +            -- - So jsonb_each will return exactly 1 row  | 
 | 123 | +            -- - We extract the 'value' which is the raw array [1, 2, 3]  | 
 | 124 | +            -- - Then get the element at task_index from that array  | 
 | 125 | +            (SELECT jsonb_array_element(value, st.task_index)  | 
 | 126 | +            FROM jsonb_each(dep_out.deps_output)  | 
 | 127 | +            LIMIT 1)  | 
 | 128 | +        END  | 
 | 129 | + | 
 | 130 | +      -- -------------------- NON-MAP STEPS --------------------  | 
 | 131 | +      -- Regular (non-map) steps receive ALL inputs as a structured object.  | 
 | 132 | +      -- This includes the original run input plus all dependency outputs.  | 
 | 133 | +      ELSE  | 
 | 134 | +        -- Non-map steps get structured input with named keys  | 
 | 135 | +        -- Example output: {  | 
 | 136 | +        --   "run": {"original": "input"},  | 
 | 137 | +        --   "step1": {"output": "from_step1"},  | 
 | 138 | +        --   "step2": {"output": "from_step2"}  | 
 | 139 | +        -- }  | 
 | 140 | +        --  | 
 | 141 | +        -- Build object with 'run' key containing original input  | 
 | 142 | +        jsonb_build_object('run', r.input) ||  | 
 | 143 | +        -- Merge with deps_output which already has dependency outputs  | 
 | 144 | +        -- deps_output format: {"dep1": output1, "dep2": output2, ...}  | 
 | 145 | +        -- If no dependencies, defaults to empty object  | 
 | 146 | +        coalesce(dep_out.deps_output, '{}'::jsonb)  | 
 | 147 | +    END as input,  | 
 | 148 | +    st.message_id as msg_id  | 
 | 149 | +  from tasks st  | 
 | 150 | +  join runs r on st.run_id = r.run_id  | 
 | 151 | +  join pgflow.steps step on  | 
 | 152 | +    step.flow_slug = st.flow_slug and  | 
 | 153 | +    step.step_slug = st.step_slug  | 
 | 154 | +  left join deps_outputs dep_out on  | 
 | 155 | +    dep_out.run_id = st.run_id and  | 
 | 156 | +    dep_out.step_slug = st.step_slug  | 
 | 157 | +$$;  | 
0 commit comments