Skip to content

Conversation

jumski
Copy link
Contributor

@jumski jumski commented Sep 10, 2025

Map Step Type Implementation (SQL Core)

Overview

This PR implements map step type functionality in pgflow's SQL Core, enabling parallel processing of array data through fanout/map operations. This foundational feature allows workflows to spawn multiple tasks from a single array input, process them in parallel, and aggregate results back into ordered arrays.

What We're Building

Core Concept: Map Steps

Map steps transform single array inputs into parallel task execution:

// Input array
const items = ['item1', 'item2', 'item3'];

// Map step spawns 3 parallel tasks
// Each task processes one array element
// Results aggregated back: ["result1", "result2", "result3"]

Two Types of Map Steps

1. Root Maps (0 dependencies)

  • Input: runs.input must be a JSONB array
  • Behavior: Spawns N tasks where N = array length
  • Example: start_flow('my_flow', ["a", "b", "c"]) → 3 parallel tasks

2. Dependent Maps (1 dependency)

  • Input: Output from a completed dependency step (must be array)
  • Behavior: Spawns N tasks based on dependency's output array length
  • Example: single_step → ["x", "y"]map_step spawns 2 tasks

Map→Map Chaining

Maps can depend on other maps:

  • Parent map completes all tasks first
  • Child map receives parent's task count as its planned count
  • Child tasks read from aggregated parent array using simple approach

Technical Implementation

Hybrid Architecture: "Fresh Data" vs "Dumb Spawning"

Key Innovation: Separate array parsing from task spawning for optimal performance.

Fresh Data Functions (parse arrays, set counts):

  • start_flow(): Validates runs.input arrays for root maps, sets initial_tasks
  • complete_task(): Validates dependency outputs for dependent maps, sets initial_tasks

Dumb Functions (use pre-computed counts):

  • start_ready_steps(): Copies initial_tasks → remaining_tasks and spawns N tasks (no JSON parsing)
  • start_tasks(): Extracts array elements using task_index
  • maybe_complete_run(): Aggregates results into ordered arrays

Schema Changes

  1. Enable Map Step Type

    -- steps table constraint updated
    check (step_type in ('single', 'map'))
  2. Remove Single Task Limit

    -- step_tasks table constraint removed
    -- constraint only_single_task_per_step check (task_index = 0)
  3. Add Task Planning Columns

    -- step_states table updates
    remaining_tasks int NULL,  -- NULL = not started, >0 = active countdown
    initial_tasks int DEFAULT 1 CHECK (initial_tasks >= 0),  -- Planned count
    CONSTRAINT remaining_tasks_state_consistency CHECK (
      remaining_tasks IS NULL OR status != 'created'
    )
  4. Enhanced Functions

    • add_step(): Accepts step_type parameter, validates map constraints
    • start_flow(): Root map validation and count setting
    • complete_task(): Dependent map count setting
    • start_ready_steps(): Efficient bulk task spawning
    • start_tasks(): Array element extraction
    • maybe_complete_run(): Result aggregation

Performance Optimizations

  • Minimal JSON Parsing: Arrays parsed exactly twice (once in each "fresh data" function)
  • Batch Operations: Efficient task creation using generate_series(0, N-1)
  • Atomic Updates: Count setting under existing locks

Edge Cases Handled

  • Empty Arrays: Auto-complete with [] output (not failures)
  • Array Validation: Fail fast with clear errors for non-array inputs
  • Mixed Workflows: Single + map steps in same flow
  • Task Indexing: 0-based indexing matching JSONB array indices
  • Ordered Results: Results aggregated in task_index order

Database Flow Examples

Root Map Flow

-- 1. Create flow with root map step
SELECT pgflow.create_flow('example_flow');
SELECT pgflow.add_step('example_flow', 'process_items', 'map');

-- 2. Start with array input
SELECT pgflow.start_flow('example_flow', '["item1", "item2", "item3"]');
-- → Validates input is array
-- → Sets initial_tasks = 3 for 'process_items' step

-- 3. start_ready_steps() spawns 3 tasks
-- → Creates step_tasks with task_index 0, 1, 2
-- → Sends 3 messages to pgmq queue

-- 4. Workers process tasks in parallel
-- → Each gets individual array element: "item1", "item2", "item3"
-- → Returns processed results

-- 5. Results aggregated in run output
-- → Final output: {"process_items": ["result1", "result2", "result3"]}

Dependent Map Flow

-- 1. Single step produces array output
SELECT pgflow.complete_task(run_id, 'prep_step', 0, '["a", "b"]');
-- → complete_task() detects map dependent
-- → Sets initial_tasks = 2 for dependent map step

-- 2. Map step becomes ready and spawns 2 tasks
-- → Each task gets individual element: "a", "b"
-- → Processes in parallel

-- 3. Results aggregated
-- → Map step output: ["processed_a", "processed_b"]

Handler Interface

Map task handlers receive individual array elements (TypeScript Array.map() style):

// Single step handler
const singleHandler = (input: {run: Json, dep1?: Json}) => { ... }

// Map step handler
const mapHandler = (item: Json) => {
  // Receives just the array element, not wrapped object
  return processItem(item);
}

Comprehensive Testing

33 test files covering:

  • Schema validation and constraints
  • Function-by-function unit tests
  • Integration workflows (root maps, dependent maps, mixed)
  • Edge cases (empty arrays, validation failures)
  • Error handling and failure propagation
  • Idempotency and double-execution safety

Benefits

  • Parallel Processing: Automatic fanout for array data
  • Type Safety: Strong validation ensures arrays where expected
  • Performance: Minimal JSON parsing, efficient batch operations
  • Simplicity: Familiar map semantics, clear error messages
  • Scalability: Handles large arrays efficiently
  • Composability: Map steps integrate seamlessly with existing single steps

Breaking Changes

None. This is a pure addition:

  • Existing single steps unchanged
  • New step_type parameter defaults to 'single'
  • Backward compatibility maintained
  • Database migrations are additive only

Implementation Status

  • Architecture Designed: Hybrid approach validated
  • Plan Documented: Complete implementation roadmap
  • 🔄 In Progress: Schema changes and function updates
  • Next: TDD implementation starting with add_step() function

Copy link

changeset-bot bot commented Sep 10, 2025

🦋 Changeset detected

Latest commit: 68d09d6

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 7 packages
Name Type
@pgflow/core Patch
pgflow Patch
@pgflow/client Patch
@pgflow/edge-worker Patch
@pgflow/example-flows Patch
@pgflow/dsl Patch
@pgflow/website Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

coderabbitai bot commented Sep 10, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch 09-10-feat_add_map_step_type_in_sql

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

nx-cloud bot commented Sep 10, 2025

View your CI Pipeline Execution ↗ for commit 68d09d6

Command Status Duration Result
nx run-many -t build --projects client,dsl --co... ✅ Succeeded 4s View ↗
nx affected -t build --configuration=production... ✅ Succeeded 3s View ↗
nx affected -t lint typecheck test --parallel -... ✅ Succeeded 5m 57s View ↗

☁️ Nx Cloud last updated this comment at 2025-09-15 18:02:01 UTC

@jumski jumski marked this pull request as ready for review September 10, 2025 19:49
PLAN.md Outdated
jsonb_build_object('run', r.input) || COALESCE(dep_out.deps_output, '{}'::jsonb)
WHEN s.step_type = 'map' AND s.deps_count = 0 THEN
-- Root map: extract item from run input array
(SELECT input -> st.task_index FROM pgflow.runs WHERE run_id = st.run_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential array bounds issue: The direct array indexing with task_index lacks bounds checking, which could cause runtime errors if the index exceeds array length. Consider adding validation:

CASE 
  WHEN st.task_index < jsonb_array_length(input) 
  THEN input -> st.task_index 
  ELSE (SELECT ERROR('Task index out of bounds')) 
END

This would provide a clear error message rather than a potentially confusing SQL error when an index is out of range.

Suggested change
(SELECT input -> st.task_index FROM pgflow.runs WHERE run_id = st.run_id)
CASE
WHEN st.task_index < jsonb_array_length((SELECT input FROM pgflow.runs WHERE run_id = st.run_id))
THEN (SELECT input -> st.task_index FROM pgflow.runs WHERE run_id = st.run_id)
ELSE (SELECT ERROR('Task index out of bounds'))
END

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

PLAN.md Outdated
Comment on lines 107 to 110
-- Set remaining_tasks = NULL for all 'created' steps (not started yet)
UPDATE pgflow.step_states
SET remaining_tasks = NULL
WHERE status = 'created';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data migration statement could be optimized to avoid unnecessary updates. Consider adding a condition to only update rows that need changing:

UPDATE pgflow.step_states 
SET remaining_tasks = NULL 
WHERE status = 'created' AND remaining_tasks IS NOT NULL;

This prevents updating rows where remaining_tasks is already NULL, reducing lock contention and improving migration performance, especially in production environments with existing data.

Suggested change
-- Set remaining_tasks = NULL for all 'created' steps (not started yet)
UPDATE pgflow.step_states
SET remaining_tasks = NULL
WHERE status = 'created';
-- Set remaining_tasks = NULL for all 'created' steps (not started yet)
UPDATE pgflow.step_states
SET remaining_tasks = NULL
WHERE status = 'created' AND remaining_tasks IS NOT NULL;

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

@jumski jumski force-pushed the 09-10-feat_add_map_step_type_in_sql branch from e5f7c0d to a3a6369 Compare September 11, 2025 05:30
@jumski jumski force-pushed the 09-10-feat_add_map_step_type_in_sql branch from a3a6369 to 6d7f91f Compare September 11, 2025 06:29
@jumski jumski force-pushed the 09-10-feat_add_map_step_type_in_sql branch from 6d7f91f to 2036d92 Compare September 11, 2025 06:47
SET status = 'started',
started_at = now()
started_at = now(),
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a null check when copying initial_tasks to remaining_tasks. While the schema defines initial_tasks with DEFAULT 1, existing data might contain null values. Using COALESCE would provide a safety net:

remaining_tasks = COALESCE(ready_steps.initial_tasks, 1)  -- Safely copy with fallback

This ensures the task counting logic remains robust even with unexpected data states.

Suggested change
remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting
remaining_tasks = COALESCE(ready_steps.initial_tasks, 1) -- Safely copy with fallback

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

@jumski jumski force-pushed the 09-10-feat_add_map_step_type_in_sql branch 2 times, most recently from a330d14 to 91ff1a6 Compare September 11, 2025 09:30
@jumski jumski force-pushed the feature-map-and-array branch from 08a197e to a0af828 Compare September 11, 2025 16:04
Copy link
Contributor

🔍 Preview Deployment: Website

Deployment successful!

🔗 Preview URL: https://pr-208.pgflow.pages.dev

📝 Details:

  • Branch: 09-10-feat_add_map_step_type_in_sql
  • Commit: fa027d1e930dd0b3d2a148d8d2b09714dbac2b04
  • View Logs

_Last updated: _

Copy link
Contributor

🔍 Preview Deployment: Playground

Deployment successful!

🔗 Preview URL: https://pr-208--pgflow-demo.netlify.app

📝 Details:

  • Branch: 09-10-feat_add_map_step_type_in_sql
  • Commit: fa027d1e930dd0b3d2a148d8d2b09714dbac2b04
  • View Logs

_Last updated: _

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant