diff --git a/.claude/commands/help-review.md b/.claude/commands/help-review.md new file mode 100644 index 000000000..bbde96602 --- /dev/null +++ b/.claude/commands/help-review.md @@ -0,0 +1,9 @@ +Your job is to help me understand changes made to $ARGUMENTS by line/section changed. +**Current branch:** !`git branch --show-current` +If there is `PLAN.md` or `PLAN_.md`, read it before starting. + +Here is the diff of the changes: + + +!`git show -p --no-ext-diff -- $ARGUMENTS` + diff --git a/.claude/settings.json b/.claude/settings.json index eea61cc00..a80127af1 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -5,6 +5,7 @@ "Bash(./scripts/atlas-migrate-hash:*)", "Bash(./scripts/run-test-with-colors:*)", "Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -c:*)", + "Bash(PGPASSWORD=postgres psql -h 127.0.0.1 -p 50422 -U postgres -d postgres -f:*)", "Bash(bin/run-test-with-colors:*)", "Bash(cat:*)", "Bash(cd:*)", @@ -17,6 +18,8 @@ "Bash(gh run list:*)", "Bash(gh run view:*)", "Bash(git rm:*)", + "Bash(git show -p --no-ext-diff --:*)", + "Bash(git whatchanged:*)", "Bash(grep:*)", "Bash(ls:*)", "Bash(mkdir:*)", diff --git a/.claude/sql_style.md b/.claude/sql_style.md index 402e79d60..752aea5a5 100644 --- a/.claude/sql_style.md +++ b/.claude/sql_style.md @@ -18,4 +18,5 @@ Always qualify columns and arguments: - `start_flow.run_id` not just `run_id` in functions ## Keyword Arguments -Use `param => "value"` NOT `param := "value"` \ No newline at end of file +Use `param => "value"` NOT `param := "value"` +- Note on aliasing tables: when writing SQL functions and working with dependencies/dependents and steps and states, I want you to build your aliases such that you use parent/child prefixes and _step (for pgflow.steps) or _state (for pgflow.step_states) suffixes accordingly. dep should mean a row in pgflow.deps, not a parent dependency. do not use dep to indicate a row from steps or step_states. \ No newline at end of file diff --git a/PLAN.md b/PLAN.md index 936377334..42856992b 100644 --- a/PLAN.md +++ b/PLAN.md @@ -2,13 +2,21 @@ **NOTE: This PLAN.md file should be removed in the final PR once all map infrastructure is complete.** -### Current State +### Features -- ✅ **WORKING**: Empty array maps (taskless) cascade and complete correctly -- ✅ **WORKING**: Task spawning creates N tasks with correct indices -- ✅ **WORKING**: Dependency count propagation for map steps -- ✅ **WORKING**: Array element extraction - tasks get full array instead of individual items -- ❌ **MISSING**: Output aggregation - no way to combine map task outputs for dependents +- ✅ **DONE**: Empty array maps (taskless) cascade and complete correctly +- ✅ **DONE**: Task spawning creates N tasks with correct indices +- ✅ **DONE**: Dependency count propagation for map steps +- ✅ **DONE**: Array element extraction - tasks receive individual array elements +- ✅ **DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents +- ⏳ **NEXT**: DSL support for `.map()` for defining map steps + +### Chores + +- ⏳ **WAITING**: Integration tests for map steps +- ⏳ **WAITING**: Consolidated migration for map steps +- ⏳ **WAITING**: Documentation for map steps +- ⏳ **WAITING**: Graphite stack merge for map steps ## Implementation Status @@ -67,16 +75,15 @@ - Handles both root maps (from run input) and dependent maps (from step outputs) - Tests with actual array data processing -#### ❌ Remaining Work +- [x] **PR #217: Output Aggregation** - `09-17-add-map-step-output-aggregation` (THIS PR) -- [ ] **Output Aggregation** (CRITICAL - BLOCKS MAP OUTPUT CONSUMPTION) + - Inline aggregation implementation in complete_task, start_tasks, maybe_complete_run + - Full test coverage (17 tests) for all aggregation scenarios + - Handles NULL preservation, empty arrays, order preservation + - Validates non-array outputs to map steps fail correctly + - Fixed broadcast aggregation to send full array not individual task output - - Aggregate map task outputs when step completes - - Store aggregated output for dependent steps to consume - - Maintain task_index ordering in aggregated arrays - - Tests for aggregation with actual map task outputs - - **IMPORTANT**: Must add test for map->map NULL propagation when this is implemented - - **IMPORTANT**: Must handle non-array outputs to map steps (should fail the run) +#### ❌ Remaining Work - [ ] **DSL Support for .map() Step Type** @@ -93,6 +100,30 @@ - Type safety for input/output types - Compile-time enforcement of single dependency rule +- [ ] **Fix Orphaned Messages on Run Failure** + + - Archive all pending messages when run fails + - Handle map sibling tasks specially + - Fix type constraint violations to fail immediately without retries + - See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md) + - Critical for production: prevents queue performance degradation + - Tests already written (stashed) that document the problem + +- [ ] **Performance Optimization: step_states.output Column** + + - Migrate from inline aggregation to storing outputs in step_states + - See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md) + - Benefits: + - Eliminate redundant aggregation queries + - 30-70% performance improvement for map chains + - Cleaner architecture with single source of truth + - Implementation: + - Add output column to step_states table + - Update complete_task to populate output on completion + - Simplify consumers (start_tasks, maybe_complete_run, broadcasts) + - Update all aggregation tests (~17 files) + - **Note**: This is an optimization that should be done after core functionality is stable + - [ ] **Integration Tests** - End-to-end workflows with real array data diff --git a/PLAN_orphaned_messages.md b/PLAN_orphaned_messages.md new file mode 100644 index 000000000..9b8e074b0 --- /dev/null +++ b/PLAN_orphaned_messages.md @@ -0,0 +1,184 @@ +# Plan: Fix Orphaned Messages on Run Failure + +## Problem Statement + +When a run fails, messages for pending tasks remain in the queue indefinitely, causing: +1. **Resource waste**: Workers continuously poll orphaned messages +2. **Performance degradation**: Queue operations slow down over time +3. **Map step issues**: Failing one map task leaves N-1 sibling messages orphaned +4. **Type violations**: Deterministic errors retry unnecessarily + +## Current Behavior + +### When fail_task is called +```sql +-- Only archives the single failing task's message +SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id); +-- Leaves all other queued messages orphaned +``` + +### When type constraint violation occurs +```sql +-- Raises exception, causes retries +RAISE EXCEPTION 'Map step % expects array input...'; +-- Transaction rolls back, but retries will hit same error +``` + +## Implementation Plan + +### 1. Update fail_task Function +**File**: `pkgs/core/schemas/0100_function_fail_task.sql` + +Add after marking run as failed (around line 47): +```sql +-- Archive all pending messages for this run +WITH tasks_to_archive AS ( + SELECT t.msg_id + FROM pgflow.step_tasks t + WHERE t.run_id = fail_task.run_id + AND t.status = 'pending' + AND t.msg_id IS NOT NULL +) +SELECT pgmq.archive('pgflow_tasks_queue', msg_id) +FROM tasks_to_archive; +``` + +### 2. Update complete_task for Type Violations +**File**: `pkgs/core/schemas/0100_function_complete_task.sql` + +Replace the current RAISE EXCEPTION block (lines 115-120) with: +```sql +IF v_dependent_map_slug IS NOT NULL THEN + -- Mark run as failed immediately (no retries for type violations) + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now(), + error = format('Type contract violation: Map step %s expects array input but dependency %s produced %s (output: %s)', + v_dependent_map_slug, + complete_task.step_slug, + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END, + complete_task.output) + WHERE run_id = complete_task.run_id; + + -- Archive ALL pending messages for this run + PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id) + FROM pgflow.step_tasks t + WHERE t.run_id = complete_task.run_id + AND t.status = 'pending' + AND t.msg_id IS NOT NULL; + + -- Mark the current task as failed (not completed) + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + error_message = format('Type contract violation: produced %s instead of array', + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END) + WHERE run_id = complete_task.run_id + AND step_slug = complete_task.step_slug + AND task_index = complete_task.task_index; + + -- Return empty result set (task not completed) + RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + RETURN; +END IF; +``` + +### 3. Add Supporting Index +**File**: New migration or add to existing + +```sql +-- Speed up the archiving query +CREATE INDEX IF NOT EXISTS idx_step_tasks_pending_with_msg +ON pgflow.step_tasks(run_id, status) +WHERE status = 'pending' AND msg_id IS NOT NULL; +``` + +## Testing + +### Tests Already Written (Stashed) + +1. **`supabase/tests/fail_task/archive_sibling_map_tasks.test.sql`** + - Verifies all map task messages are archived when one fails + - Tests: 8 assertions about message archiving and status + +2. **`supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql`** + - Verifies type violations archive all pending messages + - Tests: 8 assertions about queue cleanup and run status + +### How to Run Tests +```bash +# After unstashing and implementing the fixes: +pnpm nx test:pgtap core -- supabase/tests/fail_task/archive_sibling_map_tasks.test.sql +pnpm nx test:pgtap core -- supabase/tests/initial_tasks_null/archive_messages_on_type_constraint_failure.test.sql +``` + +## Migration Considerations + +### Backward Compatibility +- New behavior only affects failed runs (safe) +- Archiving preserves messages (can be recovered if needed) +- No schema changes to existing tables + +### Performance Impact +- One-time cost during failure (acceptable) +- Prevents ongoing performance degradation (improvement) +- Index ensures archiving query is efficient + +### Rollback Plan +If issues arise: +1. Remove the archiving logic +2. Messages remain in queue (old behavior) +3. No data loss since we archive, not delete + +## Edge Cases to Consider + +### 1. Concurrent Task Completion +If multiple tasks complete/fail simultaneously: +- PostgreSQL row locks ensure consistency +- Each failure archives all pending messages +- Idempotent: archiving already-archived messages is safe + +### 2. Very Large Map Steps +For maps with 1000+ tasks: +- Archiving might take several seconds +- Consider batching if performance issues arise +- Current approach should handle up to ~10k tasks reasonably + +### 3. Mixed Step Types +When run has both map and single steps: +- Archive logic handles all pending tasks regardless of type +- Correctly archives both map siblings and unrelated pending tasks + +## Future Enhancements (Not for this PR) + +1. **Selective Archiving**: Only archive tasks that can't proceed +2. **Batch Operations**: Archive in chunks for very large runs +3. **Recovery Mechanism**: Function to unarchive and retry +4. **Monitoring**: Track archived message counts for alerting + +## Success Criteria + +- [ ] All tests pass (both new test files) +- [ ] No orphaned messages after run failure +- [ ] Type violations don't retry +- [ ] Performance acceptable for maps with 100+ tasks +- [ ] No impact on successful run performance + +## Implementation Checklist + +- [ ] Update `fail_task` function +- [ ] Update `complete_task` function +- [ ] Add database index +- [ ] Unstash and run tests +- [ ] Test with large map steps (100+ tasks) +- [ ] Update migration file +- [ ] Document behavior change in function comments + +## Notes + +- This fix is **critical for production** - without it, queue performance will degrade over time +- Type violations are **deterministic** - retrying them is always wasteful +- Archiving (vs deleting) preserves debugging capability +- The fix is relatively simple (~30 lines of SQL) but high impact \ No newline at end of file diff --git a/PLAN_partial_completion.md b/PLAN_partial_completion.md new file mode 100644 index 000000000..b75be5a62 --- /dev/null +++ b/PLAN_partial_completion.md @@ -0,0 +1,319 @@ +# Partial Completion & Recovery Strategy for pgflow + +## Problem Statement + +When workflows fail mid-execution, especially after irreversible operations (API calls, database writes, external system mutations), we need a way to: +1. Preserve completed work +2. Allow recovery/continuation from failure point +3. Clean up orphaned resources (queued messages) +4. Provide clear failure semantics + +Currently, pgflow fails entire runs on any task failure, which: +- Wastes successfully completed work +- Leaves orphaned messages in queues +- Provides no recovery path +- Forces users to restart from scratch + +## How Other Systems Handle This + +### Temporal +**Approach**: Continue-As-New & Compensation +- **State Preservation**: Workflow state survives failures, can be resumed +- **Saga Pattern**: Explicit compensation activities to undo work +- **Activity Replay**: Deterministic replay skips completed activities +- **Non-Retryable Errors**: `ApplicationFailure` with `non_retryable=true` +- **Key Insight**: Distinguish between deterministic workflow errors (never retry) and transient activity errors (retry) + +### Inngest +**Approach**: Step-Level Isolation with Try-Catch +- **Step Independence**: Each step can fail/retry independently +- **Failure Handling**: `try/catch` blocks around steps +- **NonRetriableError**: Permanent failures that stop execution +- **Failure Handlers**: `onFailure` callbacks for cleanup/rollback +- **Key Insight**: Partial success is normal - handle errors with standard language primitives + +### Trigger.dev +**Approach**: Flexible Error Callbacks +- **handleError Callbacks**: Decide retry behavior per error type +- **Conditional Retries**: Skip retries based on error details +- **Response-Based Timing**: Retry after time from response headers +- **Key Insight**: Error handling logic lives with workflow definition + +### Apache Airflow +**Approach**: Clear & Resume +- **Task Clearing**: "Clear" failed tasks to retry from that point +- **Selective Retry**: Only retry failed tasks, not successful ones +- **Manual Intervention**: Operators can intervene to fix and resume +- **Backfill Operations**: Re-run specific date ranges or task sets +- **Key Insight**: Manual intervention is acceptable for complex failures + +### AWS Step Functions +**Approach**: Hybrid Retry + Redrive +- **Automatic Retries**: For transient errors +- **Manual Redrive**: Resume from specific states after fixing root cause +- **Selective Resume**: Choose which nodes to resume from +- **Key Insight**: Combine automatic and manual recovery strategies + +## Current pgflow Behavior + +### Type Contract Violations +```sql +-- In complete_task: RAISES EXCEPTION, causes retry attempts +IF v_dependent_map_slug IS NOT NULL THEN + RAISE EXCEPTION 'Map step % expects array input...'; +END IF; +``` +**Problems**: +- Retries a deterministic error (won't fix itself) +- Wastes resources on pointless retries +- No cleanup of sibling/parallel tasks + +### Task Failures (fail_task) +```sql +-- Archives only the failing task's message +SELECT pgmq.archive('pgflow_tasks_queue', fail_task.msg_id); +-- Leaves all other queued messages orphaned! +``` +**Problems**: +- Orphaned messages waste worker resources +- Queue performance degrades over time +- No cleanup of sibling map tasks + +## Proposed Solution (MVP) + +### Phase 1: Immediate Message Cleanup (Current PR) + +#### 1. Enhanced fail_task - Archive All Pending Messages +```sql +-- After marking run as failed, archive ALL pending messages +WITH tasks_to_archive AS ( + SELECT t.msg_id + FROM pgflow.step_tasks t + WHERE t.run_id = fail_task.run_id + AND t.status = 'pending' + AND t.msg_id IS NOT NULL +) +SELECT pgmq.archive('pgflow_tasks_queue', msg_id) +FROM tasks_to_archive; +``` + +#### 2. Type Contract Violation - Fail Fast Without Retries +```sql +-- In complete_task validation block +IF v_dependent_map_slug IS NOT NULL THEN + -- Mark run as failed immediately (no retries) + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now(), + error = format('Type contract violation: ...') + WHERE run_id = complete_task.run_id; + + -- Archive ALL pending messages immediately + PERFORM pgmq.archive('pgflow_tasks_queue', t.msg_id) + FROM pgflow.step_tasks t + WHERE t.run_id = complete_task.run_id + AND t.status = 'pending' + AND t.msg_id IS NOT NULL; + + -- Still raise for logging + RAISE EXCEPTION 'Type contract violation - run % failed', run_id; +END IF; +``` + +### Phase 2: Partial Success Status (Post-MVP) + +#### New Run Status +```sql +ALTER TYPE pgflow.run_status ADD VALUE 'partially_completed'; +``` + +#### Track Permanent vs Transient Failures +```sql +ALTER TYPE pgflow.task_status ADD VALUE 'failed_permanent'; + +-- In complete_task for type violations +UPDATE pgflow.step_tasks +SET status = 'failed_permanent', + error = 'Type contract violation...' +WHERE ...; +``` + +#### Update Run Completion Logic +```sql +-- In maybe_complete_run +UPDATE pgflow.runs +SET status = CASE + WHEN EXISTS (SELECT 1 FROM pgflow.step_tasks + WHERE run_id = ... + AND status IN ('failed', 'failed_permanent')) + THEN 'partially_completed' + ELSE 'completed' +END +``` + +### Phase 3: Clear & Resume Capability (Future) + +#### Clear Failed Tasks +```sql +CREATE FUNCTION pgflow.clear_task( + run_id uuid, + step_slug text, + task_index integer DEFAULT NULL -- NULL = clear all tasks for step +) RETURNS void AS $$ +BEGIN + -- Reset task(s) to pending + UPDATE pgflow.step_tasks + SET status = 'pending', + attempts_count = 0, + error_message = NULL, + started_at = NULL, + failed_at = NULL + WHERE pgflow.step_tasks.run_id = clear_task.run_id + AND pgflow.step_tasks.step_slug = clear_task.step_slug + AND (clear_task.task_index IS NULL OR pgflow.step_tasks.task_index = clear_task.task_index); + + -- Re-enqueue to pgmq + -- ... re-queue logic ... +END; +$$; +``` + +#### Resume Failed Run +```sql +CREATE FUNCTION pgflow.resume_run( + run_id uuid, + from_step text DEFAULT NULL -- Resume from specific step or all failed +) RETURNS void AS $$ +BEGIN + -- Reset run status + UPDATE pgflow.runs + SET status = 'started', + failed_at = NULL + WHERE pgflow.runs.run_id = resume_run.run_id; + + -- Clear failed steps + -- ... clear logic ... + + -- Restart flow processing + PERFORM pgflow.start_ready_steps(resume_run.run_id); +END; +$$; +``` + +## Implementation Priority + +### Must Have (Current PR) +1. ✅ Archive all messages when run fails +2. ✅ Handle map sibling tasks specially +3. ✅ Type violations fail immediately without retries + +### Should Have (Next PR) +1. Partial completion status +2. Distinguish permanent vs transient failures +3. Better error messages with recovery hints + +### Nice to Have (Future) +1. Clear/resume individual tasks +2. Compensation/rollback hooks +3. Checkpoint/savepoint system +4. Workflow versioning for hot-patches + +## Trade-offs & Decisions + +### Why Archive vs Delete Messages? +- **Archive**: Preserves audit trail, can analyze failures +- **Delete**: Saves space but loses debugging info +- **Decision**: Archive for now, add cleanup job later + +### Why Fail Entire Run vs Continue Parallel Branches? +- **Fail Run**: Simple, predictable, matches user expectations +- **Continue**: Complex state management, confusing semantics +- **Decision**: Fail run for MVP, consider partial continuation later + +### Why No Automatic Compensation? +- **Compensation**: Requires user-defined rollback logic +- **No Compensation**: Simpler, lets users handle externally +- **Decision**: No built-in compensation for MVP, document patterns + +## Testing Requirements + +### Failing Tests (Already Created) +1. `archive_sibling_map_tasks.test.sql` - Verify fail_task archives all map task messages +2. `archive_messages_on_type_constraint_failure.test.sql` - Verify type violations archive all pending messages + +### Additional Tests Needed +1. Performance impact of archiving many messages +2. Race conditions during failure/archive +3. Recovery after clear/resume +4. Partial completion status transitions + +## Migration Path + +### For Existing Users +1. Failure behavior changes are backward compatible +2. New statuses are additive (won't break existing code) +3. Clear/resume are opt-in features + +### Database Changes +```sql +-- Add to migration +CREATE INDEX ON pgflow.step_tasks(run_id, status) +WHERE msg_id IS NOT NULL; -- Speed up message archiving + +-- Future: Add partial_completed status +-- Future: Add failed_permanent task status +``` + +## Documentation Requirements + +### User-Facing Docs +1. Explain failure modes and recovery options +2. Document clear/resume functions +3. Best practices for idempotent handlers +4. Patterns for compensation/rollback + +### Developer Notes +1. Why we archive vs delete messages +2. Performance implications of batch archiving +3. How to extend for custom failure handling + +## Success Metrics + +### Immediate (Current PR) +- [ ] No orphaned messages after failures +- [ ] Type violations don't waste retries +- [ ] Tests pass for message archiving + +### Long-term +- [ ] Users can recover from partial failures +- [ ] Queue performance doesn't degrade over time +- [ ] Clear error messages guide recovery +- [ ] Support for compensation patterns + +## References + +- Temporal Saga Pattern: https://docs.temporal.io/application-development/application-design-patterns#saga +- Inngest Error Handling: https://www.inngest.com/docs/guides/error-handling +- Airflow Task Clearing: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html +- AWS Step Functions Redrive: https://aws.amazon.com/blogs/compute/introducing-aws-step-functions-redrive-a-new-way-to-restart-workflows/ + +## Open Questions + +1. **Should we add a `retry_on_clear` flag to steps?** Some steps might not be safe to retry even manually. + +2. **How to handle time-sensitive operations?** If a step sends a "order shipped" email, clearing and retrying days later might be inappropriate. + +3. **Should partial_completed runs be reusable?** Or should resume create a new run linked to the original? + +4. **Message archiving performance?** At what scale does archiving 1000s of messages become problematic? + +5. **Compensation pattern?** Should pgflow provide first-class support for compensation/rollback handlers? + +## Decision Log + +| Date | Decision | Rationale | +|------|----------|-----------| +| 2024-01-18 | Archive messages, don't delete | Preserves debugging info, can add cleanup later | +| 2024-01-18 | Type violations fail immediately | Deterministic errors shouldn't retry | +| 2024-01-18 | No built-in compensation for MVP | Keep it simple, document patterns | +| 2024-01-18 | Fail entire run on task failure | Predictable behavior, simpler state management | \ No newline at end of file diff --git a/PLAN_step_output.md b/PLAN_step_output.md new file mode 100644 index 000000000..b88848a9b --- /dev/null +++ b/PLAN_step_output.md @@ -0,0 +1,358 @@ +# Plan: Add `output` Column to `step_states` Table + +## Context & Timing + +**Status**: READY FOR FUTURE PR (not for current map aggregation PR) + +This plan was developed during the map output aggregation PR but should be implemented as a follow-up optimization after the map infrastructure stack is complete. The current PR uses inline aggregation which is sufficient for MVP. + +**Prerequisites**: +- Map infrastructure fully merged to main +- All map-related PRs complete (DSL, integration tests, migration consolidation) +- Current inline aggregation approach proven stable + +## Executive Summary + +This plan outlines the migration from inline output aggregation to storing step outputs directly in `pgflow.step_states`. This architectural change will improve performance, simplify code, and provide a cleaner data model. + +## Current State Analysis + +### Where Output Aggregation Happens Today + +1. **`pgflow.start_tasks`** (schemas/0120_function_start_tasks.sql:52-62) + - Aggregates map outputs when starting dependent steps + - Each dependent step re-aggregates outputs from scratch + +2. **`pgflow.maybe_complete_run`** (schemas/0100_function_maybe_complete_run.sql:36-45) + - Aggregates outputs for leaf steps when building run output + - Only runs at flow completion + +3. **`pgflow.complete_task`** (schemas/0100_function_complete_task.sql:184-197) + - Aggregates outputs for broadcast events + - Runs on every map step completion + +### Current Problems + +- **Performance**: Same aggregation query runs multiple times +- **Complexity**: Aggregation logic duplicated in 3 places +- **Map-to-Map Inefficiency**: Aggregate array just to immediately decompose it +- **Testing Burden**: Need to test aggregation in multiple contexts + +## Proposed Architecture + +### Schema Change + +```sql +ALTER TABLE pgflow.step_states +ADD COLUMN output jsonb; + +-- Constraint: output only set when step is completed +ALTER TABLE pgflow.step_states +ADD CONSTRAINT output_only_when_completed CHECK ( + output IS NULL OR status = 'completed' +); +``` + +### Key Design Decisions + +1. **Single steps**: Store task output directly (no aggregation needed) +2. **Map steps**: Store aggregated array ordered by task_index +3. **Taskless steps**: Store empty array `[]` for map steps, NULL for single steps +4. **Storage location**: Step output belongs at step level, not task level + +## Implementation Changes + +### 1. Schema Updates + +**File**: New migration or update `0060_tables_runtime.sql` +- Add `output jsonb` column to `step_states` +- Add constraint `output_only_when_completed` + +### 2. Function Updates + +#### `pgflow.complete_task` - PRIMARY CHANGE +**File**: `schemas/0100_function_complete_task.sql` + +**Current Lines 86-104**: Update step state completion +```sql +-- NEW: Set output when step completes +UPDATE pgflow.step_states +SET + status = 'completed', + completed_at = now(), + remaining_tasks = 0, + -- NEW: Store output at step level + output = CASE + WHEN (SELECT step_type FROM pgflow.steps + WHERE flow_slug = step_state.flow_slug + AND step_slug = complete_task.step_slug) = 'map' THEN + -- Aggregate all task outputs for map steps + (SELECT COALESCE(jsonb_agg(output ORDER BY task_index), '[]'::jsonb) + FROM pgflow.step_tasks + WHERE run_id = complete_task.run_id + AND step_slug = complete_task.step_slug + AND status = 'completed') + ELSE + -- Single step: use the task output directly + complete_task.output + END +WHERE ... +``` + +**Current Lines 184-197**: Simplify broadcast event +```sql +-- SIMPLIFIED: Read from step_states.output +'output', v_step_state.output, +``` + +#### `pgflow.start_tasks` - SIMPLIFY +**File**: `schemas/0120_function_start_tasks.sql` + +**Current Lines 50-62**: Replace aggregation with simple read +```sql +-- SIMPLIFIED: Read from step_states.output +dep_state.output as dep_output +... +FROM pgflow.step_states dep_state +WHERE dep_state.run_id = st.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' +``` + +#### `pgflow.maybe_complete_run` - SIMPLIFY +**File**: `schemas/0100_function_maybe_complete_run.sql` + +**Current Lines 24-45**: Replace aggregation with simple read +```sql +-- SIMPLIFIED: Read from step_states.output +SELECT jsonb_object_agg(step_slug, output) +FROM pgflow.step_states ss +WHERE ss.run_id = maybe_complete_run.run_id + AND NOT EXISTS ( + SELECT 1 FROM pgflow.deps d + WHERE d.flow_slug = ss.flow_slug + AND d.dep_slug = ss.step_slug + ) +``` + +#### `pgflow.cascade_complete_taskless_steps` - UPDATE +**File**: `schemas/0100_function_cascade_complete_taskless_steps.sql` + +**Current Lines 27-32**: Set output for taskless steps +```sql +UPDATE pgflow.step_states ss +SET status = 'completed', + started_at = now(), + completed_at = now(), + remaining_tasks = 0, + -- NEW: Set output for taskless steps + output = CASE + WHEN s.step_type = 'map' THEN '[]'::jsonb -- Empty array for map + ELSE NULL -- NULL for single steps + END +``` + +#### `pgflow.get_run_with_states` - ALREADY COMPATIBLE +**File**: `schemas/0105_function_get_run_with_states.sql` +- No changes needed - will automatically include output in response + +### 3. Test Updates + +#### Tests to Update (Add Assertions for `step_states.output`) + +1. **`tests/map_output_aggregation/basic_aggregation.test.sql`** + - Add: Verify `step_states.output` contains `[{output1}, {output2}, {output3}]` + +2. **`tests/map_output_aggregation/empty_map.test.sql`** + - Add: Verify `step_states.output = '[]'::jsonb` + +3. **`tests/map_output_aggregation/order_preservation.test.sql`** + - Add: Verify output array order matches task_index order + +4. **`tests/map_output_aggregation/map_to_single.test.sql`** + - Modify: Check dependent gets input from `step_states.output` + +5. **`tests/map_output_aggregation/map_to_map.test.sql`** + - Modify: Verify second map receives array from first map's `step_states.output` + +6. **`tests/map_output_aggregation/run_completion_leaf_map.test.sql`** + - Modify: Verify run output uses `step_states.output` + +7. **`tests/map_output_aggregation/null_outputs.test.sql`** + - Add: Verify NULL values preserved in `step_states.output` array + +8. **`tests/map_output_aggregation/mixed_dependencies.test.sql`** + - Add: Verify both map and single steps populate output correctly + +9. **`tests/map_output_aggregation/partial_completion_prevention.test.sql`** + - Add: Verify output remains NULL until all tasks complete + +10. **`tests/map_output_aggregation/failed_task_handling.test.sql`** + - Add: Verify output remains NULL when step fails + +11. **`tests/map_output_aggregation/map_initial_tasks_timing.test.sql`** + - No changes needed (focuses on timing) + +12. **`tests/map_output_aggregation/deep_map_chain.test.sql`** + - Modify: Verify each map in chain reads from previous `step_states.output` + +13. **`tests/map_output_aggregation/broadcast_aggregation.test.sql`** + - Modify: Verify broadcast uses `step_states.output` + +14. **`tests/map_output_aggregation/concurrent_completion.test.sql`** + - Add: Verify final `step_states.output` correct despite concurrency + +15. **`tests/map_output_aggregation/multiple_maps_to_single.test.sql`** + - Modify: Verify single step gets inputs from multiple `step_states.output` + +16. **`tests/complete_task/saves_output_when_completing_run.test.sql`** + - Modify: Verify run output built from `step_states.output` + +17. **`tests/completing_taskless_steps/*.sql` (7 files)** + - Add: Verify taskless maps have `output = '[]'` + - Add: Verify taskless single steps have `output = NULL` + +#### New Tests to Create + +1. **`tests/step_output/single_step_output.test.sql`** + ```sql + -- Verify single step stores task output directly in step_states.output + -- Complete a single task, check step_states.output = task.output + ``` + +2. **`tests/step_output/map_step_aggregation.test.sql`** + ```sql + -- Verify map step aggregates all task outputs in order + -- Complete N tasks, check step_states.output = array of N outputs + ``` + +3. **`tests/step_output/output_only_when_completed.test.sql`** + ```sql + -- Verify output is NULL for non-completed steps + -- Check constraint prevents setting output on non-completed steps + ``` + +4. **`tests/step_output/taskless_step_outputs.test.sql`** + ```sql + -- Verify taskless map steps get '[]' as output + -- Verify taskless single steps get NULL as output + ``` + +5. **`tests/step_output/null_task_outputs.test.sql`** + ```sql + -- Verify NULL task outputs are preserved in aggregation + -- Map with [null, {data}, null] -> step_states.output has all three + ``` + +#### Tests to Remove/Update + +1. **`tests/map_output_aggregation/broadcast_output_verification.test.sql`** + - **Action**: REMOVE - This test demonstrates a bug that becomes architecturally impossible + - **Reason**: With `step_states.output`, there's no confusion between task and aggregated output + +2. **`tests/map_output_aggregation/broadcast_event_bug.test.sql`** + - **Action**: KEEP AS-IS - Becomes regression test + - **Reason**: Will pass without modification, ensures broadcasts use aggregated output + - **Note**: No longer needs complex realtime.send mocking - just query realtime.messages + +### 4. Performance Tests + +**File**: `tests/performance/map_aggregation_performance.test.sql` + +Measure performance improvement: +1. Create flow with map -> map -> map chain +2. Complete tasks and measure query times +3. Compare with baseline from current implementation + +Expected improvements: +- `start_tasks`: 30-50% faster (no aggregation needed) +- `complete_task` broadcast: 20-30% faster +- Map-to-map chains: 50-70% faster (no aggregate/decompose cycle) + +## Migration Strategy + +### Option 1: Single Migration (Recommended) +1. Add column with DEFAULT NULL +2. Update all functions in same migration +3. Backfill existing data if needed + +### Option 2: Phased Migration +1. Add column (NULL allowed) +2. Update `complete_task` to write to both locations +3. Update consumers to read from new location +4. Remove old aggregation logic + +## Rollback Plan + +If issues arise: +1. Functions can temporarily aggregate on-the-fly if output column is NULL +2. Add fallback logic: `COALESCE(step_states.output, )` +3. Remove column only after confirming no dependencies + +## Benefits Summary + +1. **Performance**: Eliminate redundant aggregation queries +2. **Simplicity**: Single source of truth for step outputs +3. **Consistency**: Uniform output handling for all step types +4. **Maintainability**: Aggregation logic in one place +5. **Future-proof**: Enables features like result caching, partial re-runs + +## Risks & Mitigations + +| Risk | Mitigation | +|------|------------| +| Storage overhead | Minimal - same data, different location | +| Migration complexity | Test thoroughly, use transaction | +| Backward compatibility | Add column as nullable initially | +| Performance regression | Benchmark before/after | + +## Implementation Order + +1. Create performance baseline tests +2. Add schema migration +3. Update `complete_task` (primary change) +4. Update `cascade_complete_taskless_steps` +5. Simplify `start_tasks` +6. Simplify `maybe_complete_run` +7. Simplify broadcast in `complete_task` +8. Update/create tests +9. Run performance comparison +10. Document changes + +## Implementation Tasks + +### Phase 1: Schema & Core Function Updates +- [ ] Create migration adding `output` column to `step_states` +- [ ] Update `pgflow.complete_task` to populate `output` on step completion +- [ ] Update `pgflow.cascade_complete_taskless_steps` to set output for taskless steps + +### Phase 2: Consumer Function Updates +- [ ] Simplify `pgflow.start_tasks` to read from `step_states.output` +- [ ] Simplify `pgflow.maybe_complete_run` to read from `step_states.output` +- [ ] Simplify broadcast in `pgflow.complete_task` to use `v_step_state.output` + +### Phase 3: Test Updates +- [ ] Update map output aggregation test assertions (18 tests) +- [ ] Remove `broadcast_output_verification.test.sql` +- [ ] Verify `broadcast_event_bug.test.sql` passes without modification +- [ ] Create 5 new tests for output column behavior +- [ ] Run and verify all tests pass + +### Phase 4: Performance Validation +- [ ] Create baseline performance measurements +- [ ] Run performance tests after implementation +- [ ] Document performance improvements + +### Phase 5: Cleanup +- [ ] Remove temporary migration `20250917161744_pgflow_temp_handle_map_output_aggregation.sql` +- [ ] Consolidate into single clean migration +- [ ] Update documentation + +## Success Criteria + +- [ ] All existing tests pass with modifications +- [ ] New tests verify output column behavior +- [ ] Performance tests show improvement or no regression +- [ ] Code complexity reduced (3 aggregation sites -> 1) +- [ ] Migration runs cleanly on existing data +- [ ] Broadcast bug architecturally prevented \ No newline at end of file diff --git a/pkgs/client/project.json b/pkgs/client/project.json index d7b3ea818..32e27ec55 100644 --- a/pkgs/client/project.json +++ b/pkgs/client/project.json @@ -21,6 +21,7 @@ "build:lib": { "executor": "@nx/vite:build", "outputs": ["{options.outputPath}"], + "dependsOn": ["^build"], "options": { "outputPath": "pkgs/client/dist", "configFile": "pkgs/client/vite.config.ts" @@ -29,6 +30,7 @@ "build:browser": { "executor": "@nx/vite:build", "outputs": ["{options.outputPath}"], + "dependsOn": ["^build"], "options": { "outputPath": "pkgs/client/dist", "configFile": "pkgs/client/vite.browser.config.ts" @@ -36,6 +38,7 @@ }, "typecheck": { "executor": "@nx/js:tsc", + "dependsOn": ["^build"], "options": { "noEmit": true, "tsConfig": "pkgs/client/tsconfig.lib.json", diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index 25a075897..fc41fdd18 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -18,15 +18,21 @@ begin -- VALIDATION: Array output for dependent maps -- ========================================== -- Must happen BEFORE acquiring locks to fail fast without holding resources -SELECT ds.step_slug INTO v_dependent_map_slug -FROM pgflow.deps d -JOIN pgflow.steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.step_slug -JOIN pgflow.step_states ss ON ss.flow_slug = ds.flow_slug AND ss.step_slug = ds.step_slug -WHERE d.dep_slug = complete_task.step_slug - AND d.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) - AND ds.step_type = 'map' - AND ss.run_id = complete_task.run_id - AND ss.initial_tasks IS NULL +-- Only validate for single steps - map steps produce scalars that get aggregated +SELECT child_step.step_slug INTO v_dependent_map_slug +FROM pgflow.deps dependency +JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug + AND child_step.step_slug = dependency.step_slug +JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug + AND parent_step.step_slug = dependency.dep_slug +JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug + AND child_state.step_slug = child_step.step_slug +WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step + AND dependency.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_type = 'single' -- Only validate single steps + AND child_step.step_type = 'map' + AND child_state.run_id = complete_task.run_id + AND child_state.initial_tasks IS NULL AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') LIMIT 1; @@ -89,41 +95,63 @@ step_state AS ( RETURNING pgflow.step_states.* ), -- ---------- Dependency resolution ---------- --- Find all steps that depend on the completed step (only if step completed) -dependent_steps AS ( - SELECT d.step_slug AS dependent_step_slug - FROM pgflow.deps d - JOIN step_state s ON s.status = 'completed' AND d.flow_slug = s.flow_slug - WHERE d.dep_slug = complete_task.step_slug - ORDER BY d.step_slug -- Ensure consistent ordering +-- Find all child steps that depend on the completed parent step (only if parent completed) +child_steps AS ( + SELECT deps.step_slug AS child_step_slug + FROM pgflow.deps deps + JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug + WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child + ORDER BY deps.step_slug -- Ensure consistent ordering ), --- ---------- Lock dependent steps ---------- --- Acquire locks on all dependent steps before updating them -dependent_steps_lock AS ( +-- ---------- Lock child steps ---------- +-- Acquire locks on all child steps before updating them +child_steps_lock AS ( SELECT * FROM pgflow.step_states WHERE pgflow.step_states.run_id = complete_task.run_id - AND pgflow.step_states.step_slug IN (SELECT dependent_step_slug FROM dependent_steps) + AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps) FOR UPDATE ), --- ---------- Update dependent steps ---------- +-- ---------- Update child steps ---------- -- Decrement remaining_deps and resolve NULL initial_tasks for map steps -dependent_steps_update AS ( - UPDATE pgflow.step_states ss - SET remaining_deps = ss.remaining_deps - 1, - -- Resolve NULL initial_tasks for dependent map steps - -- This is where dependent maps learn their array size from upstream +child_steps_update AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- Resolve NULL initial_tasks for child map steps + -- This is where child maps learn their array size from the parent + -- This CTE only runs when the parent step is complete (see child_steps JOIN) initial_tasks = CASE - WHEN s.step_type = 'map' AND ss.initial_tasks IS NULL - AND complete_task.output IS NOT NULL - AND jsonb_typeof(complete_task.output) = 'array' THEN - jsonb_array_length(complete_task.output) - ELSE ss.initial_tasks -- Keep existing value (including NULL) + WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN + CASE + WHEN parent_step.step_type = 'map' THEN + -- Map->map: Count all completed tasks from parent map + -- We add 1 because the current task is being completed in this transaction + -- but isn't yet visible as 'completed' in the step_tasks table + -- TODO: Refactor to use future column step_states.total_tasks + -- Would eliminate the COUNT query and just use parent_state.total_tasks + (SELECT COUNT(*)::int + 1 + FROM pgflow.step_tasks parent_tasks + WHERE parent_tasks.run_id = complete_task.run_id + AND parent_tasks.step_slug = complete_task.step_slug + AND parent_tasks.status = 'completed' + AND parent_tasks.task_index != complete_task.task_index) + ELSE + -- Single->map: Use output array length (single steps complete immediately) + CASE + WHEN complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE NULL -- Keep NULL if not an array + END + END + ELSE child_state.initial_tasks -- Keep existing value (including NULL) END - FROM dependent_steps ds, pgflow.steps s - WHERE ss.run_id = complete_task.run_id - AND ss.step_slug = ds.dependent_step_slug - AND s.flow_slug = ss.flow_slug - AND s.step_slug = ss.step_slug + FROM child_steps children + JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND child_step.step_slug = children.child_step_slug + JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_slug = complete_task.step_slug + WHERE child_state.run_id = complete_task.run_id + AND child_state.step_slug = children.child_step_slug ) -- ---------- Update run remaining_steps ---------- -- Decrement the run's remaining_steps counter if step completed @@ -147,13 +175,27 @@ IF v_step_state.status = 'completed' THEN PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); -- Broadcast step:completed event + -- For map steps, aggregate all task outputs; for single steps, use the task output PERFORM realtime.send( jsonb_build_object( 'event_type', 'step:completed', 'run_id', complete_task.run_id, 'step_slug', complete_task.step_slug, 'status', 'completed', - 'output', complete_task.output, + 'output', CASE + WHEN (SELECT s.step_type FROM pgflow.steps s + WHERE s.flow_slug = v_step_state.flow_slug + AND s.step_slug = complete_task.step_slug) = 'map' THEN + -- Aggregate all task outputs for map steps + (SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.status = 'completed') + ELSE + -- Single step: use the individual task output + complete_task.output + END, 'completed_at', v_step_state.completed_at ), concat('step:', complete_task.step_slug, ':completed'), diff --git a/pkgs/core/schemas/0100_function_maybe_complete_run.sql b/pkgs/core/schemas/0100_function_maybe_complete_run.sql index deb74e3bd..5ad9a2e8c 100644 --- a/pkgs/core/schemas/0100_function_maybe_complete_run.sql +++ b/pkgs/core/schemas/0100_function_maybe_complete_run.sql @@ -10,28 +10,57 @@ begin -- ========================================== -- CHECK AND COMPLETE RUN IF FINISHED -- ========================================== - WITH run_output AS ( - -- ---------- Gather outputs from leaf steps ---------- - -- Leaf steps = steps with no dependents - SELECT jsonb_object_agg(st.step_slug, st.output) as final_output - FROM pgflow.step_tasks st - JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug - JOIN pgflow.runs r ON r.run_id = ss.run_id AND r.flow_slug = ss.flow_slug - WHERE st.run_id = maybe_complete_run.run_id - AND st.status = 'completed' - AND NOT EXISTS ( - SELECT 1 - FROM pgflow.deps d - WHERE d.flow_slug = ss.flow_slug - AND d.dep_slug = ss.step_slug - ) - ) -- ---------- Complete run if all steps done ---------- UPDATE pgflow.runs SET status = 'completed', completed_at = now(), - output = (SELECT final_output FROM run_output) + -- Only compute expensive aggregation when actually completing the run + output = ( + -- ---------- Gather outputs from leaf steps ---------- + -- Leaf steps = steps with no dependents + -- For map steps: aggregate all task outputs into array + -- For single steps: use the single task output + SELECT jsonb_object_agg( + step_slug, + CASE + WHEN step_type = 'map' THEN aggregated_output + ELSE single_output + END + ) + FROM ( + SELECT DISTINCT + leaf_state.step_slug, + leaf_step.step_type, + -- For map steps: aggregate all task outputs + CASE WHEN leaf_step.step_type = 'map' THEN + (SELECT COALESCE(jsonb_agg(leaf_task.output ORDER BY leaf_task.task_index), '[]'::jsonb) + FROM pgflow.step_tasks leaf_task + WHERE leaf_task.run_id = leaf_state.run_id + AND leaf_task.step_slug = leaf_state.step_slug + AND leaf_task.status = 'completed') + END as aggregated_output, + -- For single steps: get the single output + CASE WHEN leaf_step.step_type = 'single' THEN + (SELECT leaf_task.output + FROM pgflow.step_tasks leaf_task + WHERE leaf_task.run_id = leaf_state.run_id + AND leaf_task.step_slug = leaf_state.step_slug + AND leaf_task.status = 'completed' + LIMIT 1) + END as single_output + FROM pgflow.step_states leaf_state + JOIN pgflow.steps leaf_step ON leaf_step.flow_slug = leaf_state.flow_slug AND leaf_step.step_slug = leaf_state.step_slug + WHERE leaf_state.run_id = maybe_complete_run.run_id + AND leaf_state.status = 'completed' + AND NOT EXISTS ( + SELECT 1 + FROM pgflow.deps dep + WHERE dep.flow_slug = leaf_state.flow_slug + AND dep.dep_slug = leaf_state.step_slug + ) + ) leaf_outputs + ) WHERE pgflow.runs.run_id = maybe_complete_run.run_id AND pgflow.runs.remaining_steps = 0 AND pgflow.runs.status != 'completed' diff --git a/pkgs/core/schemas/0120_function_start_tasks.sql b/pkgs/core/schemas/0120_function_start_tasks.sql index 6c112bd28..cd65d2844 100644 --- a/pkgs/core/schemas/0120_function_start_tasks.sql +++ b/pkgs/core/schemas/0120_function_start_tasks.sql @@ -16,9 +16,12 @@ as $$ task.task_index, task.message_id from pgflow.step_tasks as task + join pgflow.runs r on r.run_id = task.run_id where task.flow_slug = start_tasks.flow_slug and task.message_id = any(msg_ids) and task.status = 'queued' + -- MVP: Don't start tasks on failed runs + and r.status != 'failed' ), start_tasks_update as ( update pgflow.step_tasks @@ -44,13 +47,28 @@ as $$ st.run_id, st.step_slug, dep.dep_slug, - dep_task.output as dep_output + -- Aggregate map outputs or use single output + CASE + WHEN dep_step.step_type = 'map' THEN + -- Aggregate all task outputs ordered by task_index + -- Use COALESCE to return empty array if no tasks + (SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb) + FROM pgflow.step_tasks dt + WHERE dt.run_id = st.run_id + AND dt.step_slug = dep.dep_slug + AND dt.status = 'completed') + ELSE + -- Single step: use the single task output + dep_task.output + END as dep_output from tasks st join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug - join pgflow.step_tasks dep_task on + join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug + left join pgflow.step_tasks dep_task on dep_task.run_id = st.run_id and dep_task.step_slug = dep.dep_slug and dep_task.status = 'completed' + and dep_step.step_type = 'single' -- Only join for single steps ), deps_outputs as ( select diff --git a/pkgs/core/scripts/regenerate-temp-migration b/pkgs/core/scripts/regenerate-temp-migration new file mode 100755 index 000000000..7511f64da --- /dev/null +++ b/pkgs/core/scripts/regenerate-temp-migration @@ -0,0 +1,180 @@ +#!/bin/bash +set -euo pipefail + +# Colors for pretty output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +CYAN='\033[0;36m' +BOLD='\033[1m' +NC='\033[0m' # No Color + +# Check for --yes flag +if [ "${1:-}" == "--yes" ]; then + SKIP_CONFIRM=true +else + SKIP_CONFIRM=false +fi + +# Trap Ctrl-C (SIGINT) and exit gracefully +trap 'echo -e "\n${YELLOW}⚠${NC} Operation cancelled by user (Ctrl-C)"; exit 130' INT + +# Function to print colored headers +print_header() { + echo -e "\n${CYAN}${BOLD}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${NC}" + echo -e "${CYAN}${BOLD} $1${NC}" + echo -e "${CYAN}${BOLD}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${NC}" +} + +# Function to print info messages +print_info() { + echo -e "${BLUE}ℹ${NC} $1" +} + +# Function to print success messages +print_success() { + echo -e "${GREEN}✓${NC} $1" +} + +# Function to print error messages +print_error() { + echo -e "${RED}✗${NC} $1" +} + +# Function to print warning messages +print_warning() { + echo -e "${YELLOW}⚠${NC} $1" +} + +# Function to ask for confirmation +confirm() { + local prompt="$1" + local response + echo -e -n "${YELLOW}${prompt} [y/N]: ${NC}" + read -r response + case "$response" in + [yY][eE][sS]|[yY]) + return 0 + ;; + *) + return 1 + ;; + esac +} + +# Get the script directory +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +MIGRATIONS_DIR="${SCRIPT_DIR}/../supabase/migrations" + +print_header "pgflow Temporary Migration Regenerator" + +# Step 1: Find the newest migration containing "pgflow_temp_" +print_info "Searching for newest temporary migration..." + +# Find all migrations with pgflow_temp_ and get the newest one +NEWEST_MIGRATION=$(ls -1 "${MIGRATIONS_DIR}"/*pgflow_temp_*.sql 2>/dev/null | sort -r | head -n1 || true) + +if [[ -z "$NEWEST_MIGRATION" ]]; then + print_error "No migration file containing 'pgflow_temp_' found in ${MIGRATIONS_DIR}" + exit 1 +fi + +MIGRATION_BASENAME=$(basename "$NEWEST_MIGRATION") +print_success "Found migration: ${BOLD}${MIGRATION_BASENAME}${NC}" + +# Extract the name part after pgflow_temp_ and before .sql +# Example: 20250917115352_pgflow_temp_handle_map_output_aggregation.sql +# Should give us: temp_handle_map_output_aggregation +TEMP_NAME="" +if [[ "$MIGRATION_BASENAME" =~ pgflow_temp_(.+)\.sql$ ]]; then + TEMP_NAME="temp_${BASH_REMATCH[1]}" +else + print_error "Could not extract name from migration file: $MIGRATION_BASENAME" + exit 1 +fi + +print_info "Extracted migration name: ${BOLD}${TEMP_NAME}${NC}" + +# Show what will be done +echo +print_header "This script will perform the following actions:" +echo -e " 1. ${RED}Remove${NC} migration: ${MIGRATION_BASENAME}" +echo -e " 2. ${BLUE}Rehash${NC} migrations using atlas-migrate-hash" +echo -e " 3. ${GREEN}Generate${NC} new migration: ${TEMP_NAME}" +echo -e " 4. ${CYAN}Verify${NC} the migration by running:" +echo -e " • pnpm nx verify-migrations core" +echo -e " • pnpm nx gen-types core" +echo -e " • pnpm nx test:pgtap core" + +echo +if [ "$SKIP_CONFIRM" != "true" ]; then + if ! confirm "Do you want to proceed?"; then + print_warning "Operation cancelled by user" + exit 0 + fi +else + print_info "Skipping confirmation (--yes flag provided)" +fi + +# Step 2: Remove the migration +print_header "Step 1: Removing temporary migration" +print_info "Removing: ${MIGRATION_BASENAME}" +rm "$NEWEST_MIGRATION" +print_success "Migration removed" + +# Step 3: Rehash migrations +print_header "Step 2: Rehashing migrations" +print_info "Running atlas-migrate-hash..." +cd "${SCRIPT_DIR}/.." +./scripts/atlas-migrate-hash --yes +print_success "Migrations rehashed" + +# Step 4: Generate new migration +print_header "Step 3: Generating new migration" +print_info "Running atlas-migrate-diff with name: ${BOLD}${TEMP_NAME}${NC}" +./scripts/atlas-migrate-diff "$TEMP_NAME" +print_success "New migration generated" + +# Step 5: Verify the migration +print_header "Step 4: Verifying migration" + +# 5a: Verify migrations +echo +print_info "Running migration verification..." +if pnpm nx verify-migrations core; then + print_success "Migration verification passed" +else + print_error "Migration verification failed" + exit 1 +fi + +# 5b: Generate types +echo +print_info "Generating types..." +if pnpm nx gen-types core; then + print_success "Type generation completed" +else + print_error "Type generation failed" + exit 1 +fi + +# 5c: Run tests +echo +print_info "Running pgTAP tests..." +if pnpm nx test:pgtap core; then + print_success "All tests passed" +else + print_error "Tests failed" + exit 1 +fi + +print_header "Migration Regeneration Complete! 🎉" +print_success "Successfully regenerated migration: ${BOLD}${TEMP_NAME}${NC}" +echo + +# Show the new migration file +NEW_MIGRATION=$(ls -1 "${MIGRATIONS_DIR}"/*"${TEMP_NAME}".sql 2>/dev/null | head -n1 || true) +if [[ -n "$NEW_MIGRATION" ]]; then + print_info "New migration file: ${BOLD}$(basename "$NEW_MIGRATION")${NC}" +fi \ No newline at end of file diff --git a/pkgs/core/supabase/migrations/20250918042753_pgflow_temp_handle_map_output_aggregation.sql b/pkgs/core/supabase/migrations/20250918042753_pgflow_temp_handle_map_output_aggregation.sql new file mode 100644 index 000000000..474e25115 --- /dev/null +++ b/pkgs/core/supabase/migrations/20250918042753_pgflow_temp_handle_map_output_aggregation.sql @@ -0,0 +1,489 @@ +-- Modify "maybe_complete_run" function +CREATE OR REPLACE FUNCTION "pgflow"."maybe_complete_run" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_completed_run pgflow.runs%ROWTYPE; +begin + -- ========================================== + -- CHECK AND COMPLETE RUN IF FINISHED + -- ========================================== + -- ---------- Complete run if all steps done ---------- + UPDATE pgflow.runs + SET + status = 'completed', + completed_at = now(), + -- Only compute expensive aggregation when actually completing the run + output = ( + -- ---------- Gather outputs from leaf steps ---------- + -- Leaf steps = steps with no dependents + -- For map steps: aggregate all task outputs into array + -- For single steps: use the single task output + SELECT jsonb_object_agg( + step_slug, + CASE + WHEN step_type = 'map' THEN aggregated_output + ELSE single_output + END + ) + FROM ( + SELECT DISTINCT + leaf_state.step_slug, + leaf_step.step_type, + -- For map steps: aggregate all task outputs + CASE WHEN leaf_step.step_type = 'map' THEN + (SELECT COALESCE(jsonb_agg(leaf_task.output ORDER BY leaf_task.task_index), '[]'::jsonb) + FROM pgflow.step_tasks leaf_task + WHERE leaf_task.run_id = leaf_state.run_id + AND leaf_task.step_slug = leaf_state.step_slug + AND leaf_task.status = 'completed') + END as aggregated_output, + -- For single steps: get the single output + CASE WHEN leaf_step.step_type = 'single' THEN + (SELECT leaf_task.output + FROM pgflow.step_tasks leaf_task + WHERE leaf_task.run_id = leaf_state.run_id + AND leaf_task.step_slug = leaf_state.step_slug + AND leaf_task.status = 'completed' + LIMIT 1) + END as single_output + FROM pgflow.step_states leaf_state + JOIN pgflow.steps leaf_step ON leaf_step.flow_slug = leaf_state.flow_slug AND leaf_step.step_slug = leaf_state.step_slug + WHERE leaf_state.run_id = maybe_complete_run.run_id + AND leaf_state.status = 'completed' + AND NOT EXISTS ( + SELECT 1 + FROM pgflow.deps dep + WHERE dep.flow_slug = leaf_state.flow_slug + AND dep.dep_slug = leaf_state.step_slug + ) + ) leaf_outputs + ) + WHERE pgflow.runs.run_id = maybe_complete_run.run_id + AND pgflow.runs.remaining_steps = 0 + AND pgflow.runs.status != 'completed' + RETURNING * INTO v_completed_run; + + -- ========================================== + -- BROADCAST COMPLETION EVENT + -- ========================================== + IF v_completed_run.run_id IS NOT NULL THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:completed', + 'run_id', v_completed_run.run_id, + 'flow_slug', v_completed_run.flow_slug, + 'status', 'completed', + 'output', v_completed_run.output, + 'completed_at', v_completed_run.completed_at + ), + 'run:completed', + concat('pgflow:run:', v_completed_run.run_id), + false + ); + END IF; +end; +$$; +-- Modify "complete_task" function +CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_step_state pgflow.step_states%ROWTYPE; + v_dependent_map_slug text; +begin + +-- ========================================== +-- VALIDATION: Array output for dependent maps +-- ========================================== +-- Must happen BEFORE acquiring locks to fail fast without holding resources +-- Only validate for single steps - map steps produce scalars that get aggregated +SELECT child_step.step_slug INTO v_dependent_map_slug +FROM pgflow.deps dependency +JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug + AND child_step.step_slug = dependency.step_slug +JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug + AND parent_step.step_slug = dependency.dep_slug +JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug + AND child_state.step_slug = child_step.step_slug +WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step + AND dependency.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_type = 'single' -- Only validate single steps + AND child_step.step_type = 'map' + AND child_state.run_id = complete_task.run_id + AND child_state.initial_tasks IS NULL + AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') +LIMIT 1; + +IF v_dependent_map_slug IS NOT NULL THEN + RAISE EXCEPTION 'Map step % expects array input but dependency % produced % (output: %)', + v_dependent_map_slug, + complete_task.step_slug, + CASE WHEN complete_task.output IS NULL THEN 'null' ELSE jsonb_typeof(complete_task.output) END, + complete_task.output; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Update task and propagate changes +-- ========================================== +WITH +-- ---------- Lock acquisition ---------- +-- Acquire locks in consistent order (run -> step) to prevent deadlocks +run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = complete_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + FOR UPDATE +), +-- ---------- Task completion ---------- +-- Update the task record with completion status and output +task AS ( + UPDATE pgflow.step_tasks + SET + status = 'completed', + completed_at = now(), + output = complete_task.output + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index + AND pgflow.step_tasks.status = 'started' + RETURNING * +), +-- ---------- Step state update ---------- +-- Decrement remaining_tasks and potentially mark step as completed +step_state AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement + ELSE 'started' + END, + completed_at = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement + ELSE NULL + END, + remaining_tasks = pgflow.step_states.remaining_tasks - 1 + FROM task + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Dependency resolution ---------- +-- Find all child steps that depend on the completed parent step (only if parent completed) +child_steps AS ( + SELECT deps.step_slug AS child_step_slug + FROM pgflow.deps deps + JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug + WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child + ORDER BY deps.step_slug -- Ensure consistent ordering +), +-- ---------- Lock child steps ---------- +-- Acquire locks on all child steps before updating them +child_steps_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps) + FOR UPDATE +), +-- ---------- Update child steps ---------- +-- Decrement remaining_deps and resolve NULL initial_tasks for map steps +child_steps_update AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- Resolve NULL initial_tasks for child map steps + -- This is where child maps learn their array size from the parent + -- This CTE only runs when the parent step is complete (see child_steps JOIN) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN + CASE + WHEN parent_step.step_type = 'map' THEN + -- Map->map: Count all completed tasks from parent map + -- We add 1 because the current task is being completed in this transaction + -- but isn't yet visible as 'completed' in the step_tasks table + -- TODO: Refactor to use future column step_states.total_tasks + -- Would eliminate the COUNT query and just use parent_state.total_tasks + (SELECT COUNT(*)::int + 1 + FROM pgflow.step_tasks parent_tasks + WHERE parent_tasks.run_id = complete_task.run_id + AND parent_tasks.step_slug = complete_task.step_slug + AND parent_tasks.status = 'completed' + AND parent_tasks.task_index != complete_task.task_index) + ELSE + -- Single->map: Use output array length (single steps complete immediately) + CASE + WHEN complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE NULL -- Keep NULL if not an array + END + END + ELSE child_state.initial_tasks -- Keep existing value (including NULL) + END + FROM child_steps children + JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND child_step.step_slug = children.child_step_slug + JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_slug = complete_task.step_slug + WHERE child_state.run_id = complete_task.run_id + AND child_state.step_slug = children.child_step_slug +) +-- ---------- Update run remaining_steps ---------- +-- Decrement the run's remaining_steps counter if step completed +UPDATE pgflow.runs +SET remaining_steps = pgflow.runs.remaining_steps - 1 +FROM step_state +WHERE pgflow.runs.run_id = complete_task.run_id + AND step_state.status = 'completed'; + +-- ========================================== +-- POST-COMPLETION ACTIONS +-- ========================================== + +-- ---------- Get updated state for broadcasting ---------- +SELECT * INTO v_step_state FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + +-- ---------- Handle step completion ---------- +IF v_step_state.status = 'completed' THEN + -- Cascade complete any taskless steps that are now ready + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); + + -- Broadcast step:completed event + -- For map steps, aggregate all task outputs; for single steps, use the task output + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'completed', + 'output', CASE + WHEN (SELECT s.step_type FROM pgflow.steps s + WHERE s.flow_slug = v_step_state.flow_slug + AND s.step_slug = complete_task.step_slug) = 'map' THEN + -- Aggregate all task outputs for map steps + (SELECT COALESCE(jsonb_agg(st.output ORDER BY st.task_index), '[]'::jsonb) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.status = 'completed') + ELSE + -- Single step: use the individual task output + complete_task.output + END, + 'completed_at', v_step_state.completed_at + ), + concat('step:', complete_task.step_slug, ':completed'), + concat('pgflow:run:', complete_task.run_id), + false + ); +END IF; + +-- ---------- Archive completed task message ---------- +-- Move message from active queue to archive table +PERFORM ( + WITH completed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.status = 'completed' + ) + SELECT pgmq.archive(ct.flow_slug, ct.message_id) + FROM completed_tasks ct + WHERE EXISTS (SELECT 1 FROM completed_tasks) +); + +-- ---------- Trigger next steps ---------- +-- Start any steps that are now ready (deps satisfied) +PERFORM pgflow.start_ready_steps(complete_task.run_id); + +-- Check if the entire run is complete +PERFORM pgflow.maybe_complete_run(complete_task.run_id); + +-- ---------- Return completed task ---------- +RETURN QUERY SELECT * +FROM pgflow.step_tasks AS step_task +WHERE step_task.run_id = complete_task.run_id + AND step_task.step_slug = complete_task.step_slug + AND step_task.task_index = complete_task.task_index; + +end; +$$; +-- Modify "start_tasks" function +CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ +with tasks as ( + select + task.flow_slug, + task.run_id, + task.step_slug, + task.task_index, + task.message_id + from pgflow.step_tasks as task + join pgflow.runs r on r.run_id = task.run_id + where task.flow_slug = start_tasks.flow_slug + and task.message_id = any(msg_ids) + and task.status = 'queued' + -- MVP: Don't start tasks on failed runs + and r.status != 'failed' + ), + start_tasks_update as ( + update pgflow.step_tasks + set + attempts_count = attempts_count + 1, + status = 'started', + started_at = now(), + last_worker_id = worker_id + from tasks + where step_tasks.message_id = tasks.message_id + and step_tasks.flow_slug = tasks.flow_slug + and step_tasks.status = 'queued' + ), + runs as ( + select + r.run_id, + r.input + from pgflow.runs r + where r.run_id in (select run_id from tasks) + ), + deps as ( + select + st.run_id, + st.step_slug, + dep.dep_slug, + -- Aggregate map outputs or use single output + CASE + WHEN dep_step.step_type = 'map' THEN + -- Aggregate all task outputs ordered by task_index + -- Use COALESCE to return empty array if no tasks + (SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb) + FROM pgflow.step_tasks dt + WHERE dt.run_id = st.run_id + AND dt.step_slug = dep.dep_slug + AND dt.status = 'completed') + ELSE + -- Single step: use the single task output + dep_task.output + END as dep_output + from tasks st + join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug + join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug + left join pgflow.step_tasks dep_task on + dep_task.run_id = st.run_id and + dep_task.step_slug = dep.dep_slug and + dep_task.status = 'completed' + and dep_step.step_type = 'single' -- Only join for single steps + ), + deps_outputs as ( + select + d.run_id, + d.step_slug, + jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, + count(*) as dep_count + from deps d + group by d.run_id, d.step_slug + ), + timeouts as ( + select + task.message_id, + task.flow_slug, + coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay + from tasks task + join pgflow.flows flow on flow.flow_slug = task.flow_slug + join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug + ), + -- Batch update visibility timeouts for all messages + set_vt_batch as ( + select pgflow.set_vt_batch( + start_tasks.flow_slug, + array_agg(t.message_id order by t.message_id), + array_agg(t.vt_delay order by t.message_id) + ) + from timeouts t + ) + select + st.flow_slug, + st.run_id, + st.step_slug, + -- ========================================== + -- INPUT CONSTRUCTION LOGIC + -- ========================================== + -- This nested CASE statement determines how to construct the input + -- for each task based on the step type (map vs non-map). + -- + -- The fundamental difference: + -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") + -- - Non-map steps: Receive structured objects with named keys + -- (e.g., {"run": {...}, "dependency1": {...}}) + -- ========================================== + CASE + -- -------------------- MAP STEPS -------------------- + -- Map steps process arrays element-by-element. + -- Each task receives ONE element from the array at its task_index position. + WHEN step.step_type = 'map' THEN + -- Map steps get raw array elements without any wrapper object + CASE + -- ROOT MAP: Gets array from run input + -- Example: run input = [1, 2, 3] + -- task 0 gets: 1 + -- task 1 gets: 2 + -- task 2 gets: 3 + WHEN step.deps_count = 0 THEN + -- Root map (deps_count = 0): no dependencies, reads from run input. + -- Extract the element at task_index from the run's input array. + -- Note: If run input is not an array, this will return NULL + -- and the flow will fail (validated in start_flow). + jsonb_array_element(r.input, st.task_index) + + -- DEPENDENT MAP: Gets array from its single dependency + -- Example: dependency output = ["a", "b", "c"] + -- task 0 gets: "a" + -- task 1 gets: "b" + -- task 2 gets: "c" + ELSE + -- Has dependencies (should be exactly 1 for map steps). + -- Extract the element at task_index from the dependency's output array. + -- + -- Why the subquery with jsonb_each? + -- - The dependency outputs a raw array: [1, 2, 3] + -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} + -- - We need to unwrap and get just the array value + -- - Map steps have exactly 1 dependency (enforced by add_step) + -- - So jsonb_each will return exactly 1 row + -- - We extract the 'value' which is the raw array [1, 2, 3] + -- - Then get the element at task_index from that array + (SELECT jsonb_array_element(value, st.task_index) + FROM jsonb_each(dep_out.deps_output) + LIMIT 1) + END + + -- -------------------- NON-MAP STEPS -------------------- + -- Regular (non-map) steps receive ALL inputs as a structured object. + -- This includes the original run input plus all dependency outputs. + ELSE + -- Non-map steps get structured input with named keys + -- Example output: { + -- "run": {"original": "input"}, + -- "step1": {"output": "from_step1"}, + -- "step2": {"output": "from_step2"} + -- } + -- + -- Build object with 'run' key containing original input + jsonb_build_object('run', r.input) || + -- Merge with deps_output which already has dependency outputs + -- deps_output format: {"dep1": output1, "dep2": output2, ...} + -- If no dependencies, defaults to empty object + coalesce(dep_out.deps_output, '{}'::jsonb) + END as input, + st.message_id as msg_id + from tasks st + join runs r on st.run_id = r.run_id + join pgflow.steps step on + step.flow_slug = st.flow_slug and + step.step_slug = st.step_slug + left join deps_outputs dep_out on + dep_out.run_id = st.run_id and + dep_out.step_slug = st.step_slug +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 29beee266..7f21a5295 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:5eNDpXz1Ru5E6c9G7Glyo398mstJYLNNhjqcTjOaGxI= +h1:L0B5nrLTUufGJ0mMcVcnPjHjuvhPMDMHvOv4rgEnycI= 20250429164909_pgflow_initial.sql h1:5K7OqB/vj73TWJTQquUzn+i6H2wWduaW+Ir1an3QYmQ= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:gnT6hYn43p5oIfr0HqoGlqX/4Si+uxMsCBtBa0/Z2Cg= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:9Yv/elMz9Nht9rCJOybx62eNrUyghsEMbMKeOJPUMVc= @@ -14,3 +14,4 @@ h1:5eNDpXz1Ru5E6c9G7Glyo398mstJYLNNhjqcTjOaGxI= 20250916093518_pgflow_temp_add_cascade_complete.sql h1:rQeqjEghqhGGUP+njrHFpPZxrxInjMHq5uSvYN1dTZc= 20250916142327_pgflow_temp_make_initial_tasks_nullable.sql h1:YXBqH6MkLFm8+eadVLh/Pc3TwewCgmVyQZBFDCqYf+Y= 20250916203905_pgflow_temp_handle_arrays_in_start_tasks.sql h1:hsesHyW890Z31WLJsXQIp9+LqnlOEE9tLIsLNCKRj+4= +20250918042753_pgflow_temp_handle_map_output_aggregation.sql h1:9aC4lyr6AEvpLTrv9Fza2Ur0QO87S0cdJDI+BPLAl60= diff --git a/pkgs/core/supabase/tests/map_output_aggregation/README.md b/pkgs/core/supabase/tests/map_output_aggregation/README.md new file mode 100644 index 000000000..fcc603ed3 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/README.md @@ -0,0 +1,220 @@ +# Map Output Aggregation Invariants & Test Scenarios + +## Core Invariants + +These properties must ALWAYS hold true for the map output aggregation feature to be correct: + +### 1. Order Preservation ✅ +- **Invariant**: Aggregated outputs MUST maintain task_index order +- **Test**: `output[i]` must correspond to `task_index = i` +- **Rationale**: Array index semantics must be preserved through aggregation +- **Coverage**: Well tested in 8+ files (basic_aggregation, order_preservation, map_to_map, run_completion_leaf_map, etc.) + +### 2. Completeness ✅ +- **Invariant**: Aggregation occurs IFF all tasks are completed +- **Test**: No partial aggregation when any task is pending/failed +- **Rationale**: Map step output represents the complete transformation +- **Coverage**: Well tested in 6+ files (partial_completion_prevention, failed_task_handling, etc.) + +### 3. Empty Array Handling ✅ +- **Invariant**: Map steps with 0 tasks produce `[]` not `NULL` +- **Test**: Empty maps must output `[]` for downstream compatibility +- **Rationale**: Consistent array type for dependent steps +- **Coverage**: Well tested (empty_map.test.sql, completing_taskless_steps tests) + +### 4. NULL Preservation ✅ +- **Invariant**: NULL task outputs are preserved in aggregated array +- **Test**: `[null, {data}, null]` remains exactly as is +- **Rationale**: NULL is semantically different from missing +- **Coverage**: Tested in null_outputs.test.sql, initial_tasks_null tests + +### 5. Aggregation Points Consistency ⚠️ +- **Invariant**: All aggregation points produce identical results +- **Test**: `start_tasks`, `maybe_complete_run`, and broadcasts must match +- **Rationale**: Single source of truth principle +- **Coverage**: Partially tested - KNOWN BUG: broadcasts send individual task output instead of aggregated array + +### 6. Type Safety for Map Dependencies ✅ +- **Invariant**: Map steps ONLY accept array inputs (or NULL for initial_tasks) +- **Test**: Non-array output to map step must fail the run +- **Rationale**: Type safety prevents runtime errors +- **Coverage**: Well tested with both positive and negative cases (non_array_to_map_should_fail, null_output_to_map_should_fail) + +### 7. Single Aggregation Per Step Completion ✅ +- **Invariant**: Aggregation happens exactly once when step completes +- **Test**: No re-aggregation on subsequent operations +- **Rationale**: Performance and consistency +- **Coverage**: Tested in run_completion_leaf_map, failed_task_handling + +## Critical Test Scenarios + +### Basic Aggregation +1. **3-task map completion** ✅ + - Complete tasks 0, 1, 2 with distinct outputs + - Verify aggregated array = `[output0, output1, output2]` + - **Coverage**: basic_aggregation.test.sql + +2. **Empty map (0 tasks)** ✅ + - Map step with initial_tasks = 0 + - Verify output = `[]` + - **Coverage**: empty_map.test.sql + +3. **Single task map** ❌ + - Map with 1 task + - Verify output = `[task_output]` (array with one element) + - **Coverage**: NOT TESTED + +### Order Preservation +4. **Out-of-order task completion** ✅ + - Complete tasks 2, 0, 1 (not in index order) + - Verify final array still ordered by task_index + - **Coverage**: order_preservation.test.sql + +5. **Concurrent task completion** ✅ + - Multiple workers completing tasks simultaneously + - Verify order preserved despite race conditions + - **Coverage**: concurrent_completion.test.sql + +### Map-to-Map Chains +6. **Map feeding into map** ✅ + - First map outputs `[1, 2, 3]` + - Second map spawns 3 tasks + - Each task receives individual element + - **Coverage**: map_to_map.test.sql + +7. **Deep map chain (3+ levels)** ✅ + - Map → Map → Map + - Verify aggregation at each level + - **Coverage**: deep_map_chain.test.sql + +8. **Empty array propagation** ✅ + - Map with 0 tasks → dependent map + - Dependent map should also have 0 tasks + - **Coverage**: normal_to_map_empty.test.sql, cascade tests + +### Map-to-Single Patterns +9. **Map feeding single step** ✅ + - Map outputs `[{a}, {b}, {c}]` + - Single step receives full array as input + - **Coverage**: map_to_single.test.sql + +10. **Multiple maps to single** ✅ + - Two map steps → single step + - Single step gets both arrays as dependencies + - **Coverage**: multiple_maps_to_single.test.sql + +### Error Cases +11. **NULL output to map** ✅ + - Single step returns NULL + - Dependent map should fail with clear error + - **Coverage**: null_output_to_map_should_fail.test.sql + +12. **Non-array output to map** ✅ + - Single step returns `{object}` or `"string"` + - Dependent map should fail with type error + - **Coverage**: non_array_to_map_should_fail.test.sql + +13. **Failed task in map** ✅ + - One task fails in 3-task map + - Map step should be marked failed + - No aggregation should occur + - **Coverage**: failed_task_handling.test.sql + +14. **Partial completion prevention** ✅ + - 2 of 3 tasks complete + - Aggregation must NOT occur + - Output remains undefined/NULL + - **Coverage**: partial_completion_prevention.test.sql + +### Edge Cases +15. **NULL values in output array** ✅ + - Tasks return `[{data}, null, {more}]` + - Aggregated array preserves NULLs exactly + - **Coverage**: null_outputs.test.sql + +16. **Large arrays (100+ tasks)** ✅ + - Performance must remain linear + - Order preservation at scale + - **Coverage**: large_array_performance.test.sql, map_large_array.test.sql + +17. **Mixed step types** ✅ + - Single → Map → Single → Map + - Each transition handles types correctly + - **Coverage**: mixed_dependencies.test.sql + +### Broadcast Events +18. **Step completion broadcast** ⚠️ + - Map step completes + - Broadcast contains aggregated array, not last task output + - **Coverage**: KNOWN BUG - broadcast_aggregation.test.sql, broadcast_output_verification.test.sql + +19. **Run completion with leaf map** ✅ + - Map step as terminal node + - Run output contains aggregated array + - **Coverage**: run_completion_leaf_map.test.sql + +### Timing & State +20. **Initial_tasks resolution timing** ✅ + - Dependent map's initial_tasks set when dependency completes + - Not before, not after + - **Coverage**: map_initial_tasks_timing.test.sql + +21. **Taskless cascade with maps** ✅ + - Map with 0 tasks triggers cascade + - Dependent steps complete in topological order + - **Coverage**: taskless_sequence.test.sql, cascade_performance.test.sql + +## Performance Invariants + +### Aggregation Efficiency +- **Invariant**: Aggregation query runs O(n) where n = number of tasks +- **Test**: Measure query time scales linearly with task count + +### No Redundant Aggregation +- **Invariant**: Same data never aggregated twice for same purpose +- **Test**: Trace queries, ensure no duplicate aggregations + +## Implementation Requirements + +To satisfy these invariants, the implementation MUST: + +1. Use `ORDER BY task_index` in all aggregation queries +2. Use `COALESCE(..., '[]'::jsonb)` for empty array default +3. Check task completion status before aggregating +4. Validate input types for map steps +5. Aggregate exactly once at step completion +6. Store or compute aggregated output consistently + +## Verification Checklist + +- [x] All basic aggregation tests pass +- [x] Order preserved across all scenarios +- [x] Empty arrays handled correctly +- [x] NULL values preserved +- [x] Type errors caught and reported +- [ ] Broadcasts contain aggregated output (KNOWN BUG - sends individual task output) +- [x] Performance scales linearly +- [x] No redundant aggregations (single aggregation per completion) +- [x] Map-to-map chains work correctly +- [x] Error cases fail gracefully + +## Test Coverage Summary + +### Well Covered Invariants (✅) +- Order Preservation (8+ test files) +- Completeness (6+ test files) +- Empty Array Handling (multiple test files) +- NULL Preservation (explicit tests) +- Type Safety for Map Dependencies (positive & negative cases) +- Single Aggregation Per Step Completion + +### Known Issues (⚠️) +- **Aggregation Points Consistency**: Broadcast events contain individual task outputs instead of aggregated arrays (documented bug in broadcast_aggregation.test.sql and broadcast_output_verification.test.sql) + +### Missing Test Coverage (❌) +- **Single task map**: No test for map with exactly 1 task producing `[output]` + +### Recommendations +1. Fix the broadcast aggregation bug (currently sends last task output instead of aggregated array) +2. Add test for single-task map scenario +3. Consider removing broadcast_output_verification.test.sql once bug is fixed (it demonstrates the bug) \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/basic_aggregation.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/basic_aggregation.test.sql new file mode 100644 index 000000000..7688f26ba --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/basic_aggregation.test.sql @@ -0,0 +1,69 @@ +begin; +select plan(3); + +-- Test: Basic map output aggregation +-- Map tasks outputs should be aggregated into an array for dependent steps + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_agg', 10, 60, 3); +select pgflow.add_step('test_agg', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_agg', 'single_step', array['map_step'], null, null, null, null, 'single'); + +-- Start flow with 3-item array +select is( + (select count(*) from pgflow.start_flow('test_agg', '[10, 20, 30]'::jsonb)), + 1::bigint, + 'Flow should start successfully' +); + +-- Complete all 3 map tasks with different outputs +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + i int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete map tasks with outputs that include the index + for i in 1..3 loop + select * into v_task from pgflow_tests.read_and_start('test_agg', 1, 1); + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + i - 1, -- task_index (0-based) + jsonb_build_object('result', i * 100) -- outputs: {result:100}, {result:200}, {result:300} + ); + end loop; + + -- Trigger dependent step + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Check that single_step receives aggregated array as input +select is( + (select input from pgflow_tests.read_and_start('test_agg', 1, 1)), + jsonb_build_object( + 'run', '[10, 20, 30]'::jsonb, + 'map_step', jsonb_build_array( + jsonb_build_object('result', 100), + jsonb_build_object('result', 200), + jsonb_build_object('result', 300) + ) + ), + 'Single step should receive aggregated map outputs as array' +); + +-- Verify the aggregated output is stored correctly +select is( + (select count(*) + from pgflow.step_states + where step_slug = 'map_step' + and status = 'completed'), + 1::bigint, + 'Map step should be completed' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql new file mode 100644 index 000000000..6efa6f11c --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/broadcast_event_fixed.test.sql @@ -0,0 +1,115 @@ +begin; +select plan(4); + +-- Test: Map step completion broadcast contains aggregated array +-- This test verifies the broadcast fix is working correctly + +-- Setup +select pgflow_tests.reset_db(); + +-- Ensure partition exists for realtime.messages +select pgflow_tests.create_realtime_partition(); + +select pgflow.create_flow('test_broadcast_fix', 10, 60, 3); +select pgflow.add_step('test_broadcast_fix', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_broadcast_fix', 'consumer', array['map_step'], null, null, null, null, 'single'); + +-- Start flow with 3-element array +select is( + (select count(*) from pgflow.start_flow('test_broadcast_fix', '[100, 200, 300]'::jsonb)), + 1::bigint, + 'Flow should start with 3-element array' +); + +-- Complete all map tasks with distinct outputs +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete task 0 + select * into v_task from pgflow_tests.read_and_start('test_broadcast_fix', 1, 1); + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + jsonb_build_object('value', 'first_result') + ); + + -- Complete task 1 + select * into v_task from pgflow_tests.read_and_start('test_broadcast_fix', 1, 1); + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + jsonb_build_object('value', 'second_result') + ); + + -- Complete task 2 (final task - triggers step completion) + select * into v_task from pgflow_tests.read_and_start('test_broadcast_fix', 1, 1); + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + jsonb_build_object('value', 'third_result') + ); +end $$; + +-- Verify step:completed event was sent +select is( + pgflow_tests.count_realtime_events('step:completed', + (select run_id from pgflow.runs limit 1), + 'map_step'), + 1, + 'Should have exactly one step:completed broadcast for map_step' +); + +-- CRITICAL TEST: Broadcast should contain AGGREGATED array, not last task output +select is( + (select payload->'output' from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->>'step_slug' = 'map_step' + limit 1), + jsonb_build_array( + jsonb_build_object('value', 'first_result'), + jsonb_build_object('value', 'second_result'), + jsonb_build_object('value', 'third_result') + ), + 'Broadcast output should be aggregated array of all task outputs' +); + +-- Also verify the event contains correct metadata +select is( + (select + (payload->>'event_type' = 'step:completed') and + (payload->>'step_slug' = 'map_step') and + (payload->>'status' = 'completed') and + (payload->'completed_at' is not null) + from realtime.messages + where payload->>'event_type' = 'step:completed' + and payload->>'step_slug' = 'map_step' + limit 1), + true, + 'Broadcast should contain correct metadata' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/concurrent_completion.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/concurrent_completion.test.sql new file mode 100644 index 000000000..c46c8409a --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/concurrent_completion.test.sql @@ -0,0 +1,67 @@ +begin; +select plan(3); + +-- Test: Concurrent completion of map tasks +-- Verify aggregation is correct when multiple tasks complete simultaneously + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_concurrent', 10, 60, 3); +select pgflow.add_step('test_concurrent', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_concurrent', 'consumer', array['map_step'], null, null, null, null, 'single'); + +-- Start flow with 10-element array (larger to increase chance of race conditions) +select is( + (select count(*) from pgflow.start_flow('test_concurrent', '[1,2,3,4,5,6,7,8,9,10]'::jsonb)), + 1::bigint, + 'Flow should start with 10-element array' +); + +-- Start all tasks first (simulating multiple workers grabbing tasks) +do $$ +declare + v_tasks pgflow.step_task_record[]; + v_task pgflow.step_task_record; + v_order int[] := array[5,1,8,3,9,2,7,4,10,6]; + v_idx int; + i int; +begin + -- Read and start all 10 tasks (simulating 10 concurrent workers) + for i in 1..10 loop + select * into v_task from pgflow_tests.read_and_start('test_concurrent', 1, 1); + v_tasks := array_append(v_tasks, v_task); + end loop; + + -- Now complete them all in rapid succession (simulating concurrent completions) + -- Complete in a mixed order to test ordering preservation + foreach v_idx in array v_order loop + perform pgflow.complete_task( + v_tasks[v_idx].run_id, + v_tasks[v_idx].step_slug, + v_idx - 1, -- task_index is 0-based + to_jsonb(v_idx * 100) -- outputs: 100, 200, 300, etc. + ); + end loop; + + -- Trigger dependent step + perform pgflow.start_ready_steps(v_tasks[1].run_id); +end $$; + +-- Verify the aggregation is correct despite concurrent completions +select is( + (select input->'map_step' from pgflow_tests.read_and_start('test_concurrent', 1, 1)), + jsonb_build_array(100, 200, 300, 400, 500, 600, 700, 800, 900, 1000), + 'Aggregated array should be in correct order despite mixed completion order' +); + +-- Verify step completed successfully with all tasks +select is( + (select count(*) from pgflow.step_tasks + where step_slug = 'map_step' + and status = 'completed'), + 10::bigint, + 'All 10 map tasks should be completed' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/deep_map_chain.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/deep_map_chain.test.sql new file mode 100644 index 000000000..ae8eeaa0a --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/deep_map_chain.test.sql @@ -0,0 +1,111 @@ +begin; +select plan(4); + +-- Test: Deep chain of 10 map steps +-- Verify aggregation/decomposition works correctly through deep chains + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('deep_chain', 10, 60, 3); + +-- Create chain of 10 map steps +select pgflow.add_step('deep_chain', 'map1', '{}', null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map2', array['map1'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map3', array['map2'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map4', array['map3'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map5', array['map4'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map6', array['map5'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map7', array['map6'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map8', array['map7'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map9', array['map8'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'map10', array['map9'], null, null, null, null, 'map'); +select pgflow.add_step('deep_chain', 'final_collector', array['map10'], null, null, null, null, 'single'); + +-- Start flow with 3-element array +select is( + (select count(*) from pgflow.start_flow('deep_chain', '[1, 2, 3]'::jsonb)), + 1::bigint, + 'Flow should start with 3-element array' +); + +-- Process all 10 map steps +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; + v_expected_step text; + v_map_num int; + v_total_tasks_completed int := 0; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Process each map level (10 maps * 3 tasks each = 30 tasks total) + for v_map_num in 1..10 loop + v_expected_step := 'map' || v_map_num; + + -- Complete 3 tasks for current map + for i in 1..3 loop + select * into v_task from pgflow_tests.read_and_start('deep_chain', 1, 1); + + -- Verify we're processing the expected map step + if v_task.step_slug != v_expected_step then + raise exception 'Expected %, got %', v_expected_step, v_task.step_slug; + end if; + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Transform: each map adds 10 to the value + -- Input at map1: 1, 2, 3 + -- Output at map1: 11, 12, 13 + -- Output at map2: 21, 22, 23 + -- ... + -- Output at map10: 101, 102, 103 + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb((v_task.input::int) + 10) + ); + + v_total_tasks_completed := v_total_tasks_completed + 1; + end loop; + + -- Trigger next map in chain + perform pgflow.start_ready_steps(v_run_id); + end loop; + + raise notice 'Completed % tasks across 10 maps', v_total_tasks_completed; +end $$; + +-- Verify all 30 tasks were created and completed +select is( + (select count(*) from pgflow.step_tasks where status = 'completed'), + 30::bigint, + 'Should have completed exactly 30 tasks (10 maps * 3 tasks each)' +); + +-- Verify the final collector receives correctly transformed array +select is( + (select input->'map10' from pgflow_tests.read_and_start('deep_chain', 1, 1)), + jsonb_build_array(101, 102, 103), + 'Final collector should receive array transformed 10 times (+10 each time)' +); + +-- Verify chain propagation: check a few intermediate steps had correct initial_tasks +select is( + (select array_agg(initial_tasks order by step_slug) + from pgflow.step_states + where step_slug in ('map5', 'map7', 'map10') + and run_id = (select run_id from pgflow.runs limit 1)), + array[3, 3, 3], + 'All dependent maps should have initial_tasks = 3' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/empty_map.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/empty_map.test.sql new file mode 100644 index 000000000..aa2fcec82 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/empty_map.test.sql @@ -0,0 +1,46 @@ +begin; +select plan(3); + +-- Test: Empty map output aggregation +-- Map with 0 tasks should produce empty array [] + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_empty', 10, 60, 3); +select pgflow.add_step('test_empty', 'empty_map', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_empty', 'consumer', array['empty_map'], null, null, null, null, 'single'); + +-- Start flow with empty array +select is( + (select count(*) from pgflow.start_flow('test_empty', '[]'::jsonb)), + 1::bigint, + 'Flow should start with empty array' +); + +-- Verify map step completed immediately (taskless) +select is( + (select status from pgflow.step_states + where step_slug = 'empty_map'), + 'completed', + 'Empty map should auto-complete' +); + +-- Trigger dependent step and check input +do $$ +declare + v_run_id uuid; +begin + select run_id into v_run_id from pgflow.runs limit 1; + -- Trigger dependent step (should already be triggered but just in case) + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Check that consumer receives empty array +select is( + (select input->'empty_map' from pgflow_tests.read_and_start('test_empty', 1, 1)), + '[]'::jsonb, + 'Consumer should receive empty array from empty map' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/failed_task_handling.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/failed_task_handling.test.sql new file mode 100644 index 000000000..d58d82e42 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/failed_task_handling.test.sql @@ -0,0 +1,98 @@ +begin; +select plan(4); + +-- Test: Failed task in map step should fail run and prevent other tasks from starting +-- MVP approach: fail entire run when any task fails + +-- Setup +select pgflow_tests.reset_db(); +-- Create flow with max_attempts=1 so tasks fail immediately +select pgflow.create_flow('test_fail', 1, 5, 60); +select pgflow.add_step('test_fail', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_fail', 'dependent', array['map_step'], null, null, null, null, 'single'); + +-- Start flow with 5-element array +select is( + (select count(*) from pgflow.start_flow('test_fail', '[1, 2, 3, 4, 5]'::jsonb)), + 1::bigint, + 'Flow should start with 5-element array' +); + +-- Complete 2 tasks successfully, then fail one +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete 2 tasks successfully + for i in 1..2 loop + select * into v_task from pgflow_tests.read_and_start('test_fail', 1, 1); + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Complete task successfully + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(v_task_index * 10) + ); + end loop; + + -- Now fail the third task + select * into v_task from pgflow_tests.read_and_start('test_fail', 1, 1); + + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Fail the task + perform pgflow.fail_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + 'Test failure: Simulated error' + ); +end $$; + +-- Verify run is marked as failed +select is( + (select status from pgflow.runs limit 1), + 'failed', + 'Run should be marked as failed when any task fails' +); + +-- Try to start another task (should be prevented) +do $$ +declare + v_task pgflow.step_task_record; +begin + select * into v_task from pgflow_tests.read_and_start('test_fail', 1, 1); + + if v_task.step_slug is not null then + raise exception 'Should not be able to start tasks on failed run, but got task for %', v_task.step_slug; + end if; +end $$; + +-- This test passes if we reach here +select pass('Tasks cannot be started on failed run'); + +-- Verify dependent step never starts +select is( + (select count(*) from pgflow.step_tasks where step_slug = 'dependent'), + 0::bigint, + 'Dependent step should not start when run is failed' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql new file mode 100644 index 000000000..e060dae95 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/large_array_performance.test.sql @@ -0,0 +1,116 @@ +begin; +select plan(3); + +-- Test: Performance with large arrays (same scale as other perf tests) +-- Uses 100 tasks to match baseline measurements + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_large', 10, 60, 3); +select pgflow.add_step('test_large', 'large_map', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_large', 'consumer', array['large_map'], null, null, null, null, 'single'); + +-- Start flow with 100-element array +select is( + (select count(*) from pgflow.start_flow('test_large', + (select jsonb_agg(n) from generate_series(1, 100) n) + )), + 1::bigint, + 'Flow should start with 100-element array' +); + +-- Complete all 100 map tasks and measure aggregation +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; + v_start_time timestamp; + v_duration interval; + i int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete all 100 tasks + for i in 1..100 loop + select * into v_task from pgflow_tests.read_and_start('test_large', 1, 1); + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Complete with simple output + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(v_task_index * 10) + ); + end loop; + + -- Measure time to trigger dependent step (with aggregation) + v_start_time := clock_timestamp(); + perform pgflow.start_ready_steps(v_run_id); + v_duration := clock_timestamp() - v_start_time; + + -- Check if performance is reasonable (< 10ms for 100 tasks) + if extract(milliseconds from v_duration) + extract(seconds from v_duration) * 1000 > 10 then + raise notice 'Aggregation took % ms for 100 tasks', + round(extract(milliseconds from v_duration) + extract(seconds from v_duration) * 1000, 2); + end if; +end $$; + +-- Verify consumer receives complete aggregated array +select is( + (select jsonb_array_length(input->'large_map') + from pgflow_tests.read_and_start('test_large', 1, 1)), + 100, + 'Consumer should receive all 100 elements in aggregated array' +); + +-- Complete consumer and check run completion with aggregation +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_start_time timestamp; + v_duration interval; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Get consumer task + select * into v_task from pgflow.step_tasks + where run_id = v_run_id and step_slug = 'consumer' and status = 'started'; + + -- Complete it + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + 0, + jsonb_build_object('processed', 100) + ); + + -- Measure run completion (shouldn't need to aggregate since consumer is leaf) + v_start_time := clock_timestamp(); + perform pgflow.maybe_complete_run(v_run_id); + v_duration := clock_timestamp() - v_start_time; + + -- Check if performance is reasonable (< 5ms) + if extract(milliseconds from v_duration) + extract(seconds from v_duration) * 1000 > 5 then + raise notice 'Run completion took % ms', + round(extract(milliseconds from v_duration) + extract(seconds from v_duration) * 1000, 2); + end if; +end $$; + +-- Verify run completed successfully +select is( + (select status from pgflow.runs limit 1), + 'completed', + 'Run should complete successfully with large array' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/map_initial_tasks_timing.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/map_initial_tasks_timing.test.sql new file mode 100644 index 000000000..a6605ae43 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/map_initial_tasks_timing.test.sql @@ -0,0 +1,155 @@ +begin; +select plan(6); + +-- Test: Map-to-map initial_tasks should only be set when dependency step completes +-- NOT when individual tasks complete + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_timing', 10, 60, 3); +select pgflow.add_step('test_timing', 'map1', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_timing', 'map2', array['map1'], null, null, null, null, 'map'); + +-- Start flow with 4-item array +select is( + (select count(*) from pgflow.start_flow('test_timing', '[10, 20, 30, 40]'::jsonb)), + 1::bigint, + 'Flow should start with 4-element array' +); + +-- Verify map2 initial_tasks is NULL at start +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'map2' + and run_id = (select run_id from pgflow.runs limit 1)), + NULL::int, + 'Map2 initial_tasks should be NULL before map1 completes' +); + +-- Complete ONLY ONE map1 task +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete just one task + select * into v_task from pgflow_tests.read_and_start('test_timing', 1, 1); + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + 0, -- First task + '100'::jsonb + ); +end $$; + +-- CRITICAL TEST: Map2 initial_tasks should STILL be NULL after one task completes +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'map2' + and run_id = (select run_id from pgflow.runs limit 1)), + NULL::int, + 'Map2 initial_tasks should STILL be NULL after completing 1 of 4 map1 tasks' +); + +-- Complete remaining 3 map1 tasks +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; + v_map2_initial int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete tasks 2, 3, and 4 + for i in 2..4 loop + select * into v_task from pgflow_tests.read_and_start('test_timing', 1, 1); + + -- Get task_index from step_tasks table + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(100 + v_task_index) + ); + + -- Check map2 initial_tasks after each completion + select initial_tasks into v_map2_initial + from pgflow.step_states + where step_slug = 'map2' and run_id = v_run_id; + + raise notice 'After completing task % (index %): map2 initial_tasks = %', i, v_task_index, v_map2_initial; + end loop; +end $$; + +-- Debug: Show step states after completing all map1 tasks +do $$ +declare + v_map1_status text; + v_map2_initial int; + v_map1_tasks_completed int; +begin + select status into v_map1_status + from pgflow.step_states + where step_slug = 'map1' + and run_id = (select run_id from pgflow.runs limit 1); + + select initial_tasks into v_map2_initial + from pgflow.step_states + where step_slug = 'map2' + and run_id = (select run_id from pgflow.runs limit 1); + + select count(*) into v_map1_tasks_completed + from pgflow.step_tasks + where step_slug = 'map1' + and status = 'completed' + and run_id = (select run_id from pgflow.runs limit 1); + + raise notice 'Map1 status: %, Map2 initial_tasks: %, Map1 completed tasks: %', + v_map1_status, v_map2_initial, v_map1_tasks_completed; +end $$; + +-- Verify map1 is now complete +select is( + (select status from pgflow.step_states + where step_slug = 'map1' + and run_id = (select run_id from pgflow.runs limit 1)), + 'completed', + 'Map1 should be completed after all 4 tasks complete' +); + +-- NOW map2 initial_tasks should be set to 4 +select is( + (select initial_tasks from pgflow.step_states + where step_slug = 'map2' + and run_id = (select run_id from pgflow.runs limit 1)), + 4::int, + 'Map2 initial_tasks should be 4 after map1 step completes' +); + +-- Trigger ready steps to create map2 tasks +do $$ +declare + v_run_id uuid; +begin + select run_id into v_run_id from pgflow.runs limit 1; + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify correct number of map2 tasks created +select is( + (select count(*) from pgflow.step_tasks where step_slug = 'map2'), + 4::bigint, + 'Should have exactly 4 map2 tasks created' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/map_to_map.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/map_to_map.test.sql new file mode 100644 index 000000000..ebe48cd74 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/map_to_map.test.sql @@ -0,0 +1,129 @@ +begin; +select plan(5); + +-- Test: Map to map step dependency +-- Second map should receive aggregated array and each task gets element at its index + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_m2m', 10, 60, 3); +select pgflow.add_step('test_m2m', 'map1', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_m2m', 'map2', array['map1'], null, null, null, null, 'map'); +select pgflow.add_step('test_m2m', 'collector', array['map2'], null, null, null, null, 'single'); + +-- Start flow with 4-item array +select is( + (select count(*) from pgflow.start_flow('test_m2m', '[10, 20, 30, 40]'::jsonb)), + 1::bigint, + 'Flow should start with 4-element array' +); + +-- Complete first map tasks (transform by multiplying by 2) +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; + v_inputs int[] := array[10, 20, 30, 40]; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete all 4 map1 tasks + for i in 1..4 loop + select * into v_task from pgflow_tests.read_and_start('test_m2m', 1, 1); + + -- Get task_index from step_tasks table + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Use the actual task_index from the task + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(v_inputs[v_task_index + 1] * 2) -- +1 because arrays are 1-indexed + ); + end loop; + + -- Trigger map2 steps + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Debug: Check if map2 tasks were created +select is( + (select count(*) from pgflow.step_tasks where step_slug = 'map2'), + 4::bigint, + 'Should have 4 map2 tasks created' +); + +-- Verify map2 tasks receive correct individual elements +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; + v_expected_inputs int[] := array[20, 40, 60, 80]; + i int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Read and verify input for each map2 task + for i in 1..4 loop + select * into v_task from pgflow_tests.read_and_start('test_m2m', 1, 1); + + -- Verify this is map2 + if v_task.step_slug != 'map2' then + raise exception 'Expected map2, got %', v_task.step_slug; + end if; + + -- Get task_index from step_tasks table + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Map tasks should receive raw elements based on their task_index + -- The expected input is the element at task_index from the aggregated array + if v_task.input != to_jsonb(v_expected_inputs[v_task_index + 1]) then + raise exception 'Task % expected input %, got %', + v_task_index, to_jsonb(v_expected_inputs[v_task_index + 1]), v_task.input; + end if; + + -- Complete map2 task (transform by adding 100) + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb((v_expected_inputs[v_task_index + 1])::int + 100) + ); + end loop; + + -- Trigger collector step + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Test passes if we reach here without exceptions +select pass('Map2 tasks receive correct individual elements from map1 aggregation'); + +-- Debug: Check if map2 tasks are actually completed +select is( + (select count(*) from pgflow.step_tasks + where step_slug = 'map2' + and status = 'completed'), + 4::bigint, + 'All 4 map2 tasks should be completed' +); + +-- Verify collector receives map2's aggregated output +select is( + (select input->'map2' from pgflow_tests.read_and_start('test_m2m', 1, 1)), + jsonb_build_array(120, 140, 160, 180), + 'Collector should receive aggregated map2 outputs' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/map_to_single.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/map_to_single.test.sql new file mode 100644 index 000000000..37e3a4254 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/map_to_single.test.sql @@ -0,0 +1,63 @@ +begin; +select plan(2); + +-- Test: Map to single step with various data types +-- Single step should receive full aggregated array with mixed types + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_m2s', 10, 60, 3); +select pgflow.add_step('test_m2s', 'map_src', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_m2s', 'single_dst', array['map_src'], null, null, null, null, 'single'); + +-- Start flow with mixed type array +select is( + (select count(*) from pgflow.start_flow('test_m2s', + '[100, "text", true, null, {"nested": "object"}, [1,2,3]]'::jsonb)), + 1::bigint, + 'Flow should start with mixed type array' +); + +-- Complete map tasks with different output types +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_outputs jsonb[] := array[ + '42'::jsonb, -- number + '"string output"'::jsonb, -- string + 'false'::jsonb, -- boolean + 'null'::jsonb, -- null + '{"key": "value", "num": 123}'::jsonb, -- object + '[10, 20, 30]'::jsonb -- array + ]; + i int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete all 6 map tasks + for i in 1..6 loop + select * into v_task from pgflow_tests.read_and_start('test_m2s', 1, 1); + perform pgflow.complete_task(v_task.run_id, v_task.step_slug, i - 1, v_outputs[i]); + end loop; + + -- Trigger dependent step + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify single step receives properly aggregated array +select is( + (select input->'map_src' from pgflow_tests.read_and_start('test_m2s', 1, 1)), + jsonb_build_array( + 42, + 'string output', + false, + null, + jsonb_build_object('key', 'value', 'num', 123), + jsonb_build_array(10, 20, 30) + ), + 'Single step should receive all outputs aggregated in order' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/mixed_dependencies.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/mixed_dependencies.test.sql new file mode 100644 index 000000000..cebc13fcd --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/mixed_dependencies.test.sql @@ -0,0 +1,103 @@ +begin; +select plan(2); + +-- Test: Step depending on both map and single steps +-- Should receive both aggregated array and single output + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_mixed', 10, 60, 3); +-- Create an initial step that outputs array for the map +select pgflow.add_step('test_mixed', 'array_producer', '{}', null, null, null, null, 'single'); +select pgflow.add_step('test_mixed', 'single_src', '{}', null, null, null, null, 'single'); +select pgflow.add_step('test_mixed', 'map_src', array['array_producer'], null, null, null, null, 'map'); +select pgflow.add_step('test_mixed', 'consumer', array['single_src', 'map_src'], null, null, null, null, 'single'); + +-- Start flow with object input +select is( + (select count(*) from pgflow.start_flow('test_mixed', + jsonb_build_object( + 'config', 'test' + ) + )), + 1::bigint, + 'Flow should start with object input' +); + +-- Complete initial steps +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete array_producer task (produces array for map) + select * into v_task from pgflow_tests.read_and_start('test_mixed', 1, 1); + if v_task.step_slug != 'array_producer' then + raise exception 'Expected array_producer, got %', v_task.step_slug; + end if; + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + 0, + jsonb_build_array('a', 'b', 'c') -- Output array for map_src + ); + + -- Complete single_src task + select * into v_task from pgflow_tests.read_and_start('test_mixed', 1, 1); + if v_task.step_slug != 'single_src' then + raise exception 'Expected single_src, got %', v_task.step_slug; + end if; + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + 0, + jsonb_build_object('processed', true, 'value', 100) + ); + + -- Trigger map_src steps + perform pgflow.start_ready_steps(v_run_id); + + -- Complete map_src tasks (3 tasks for the 3-element array) + for i in 1..3 loop + select * into v_task from pgflow_tests.read_and_start('test_mixed', 1, 1); + + if v_task.step_slug != 'map_src' then + raise exception 'Expected map_src, got %', v_task.step_slug; + end if; + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Complete with uppercase transformation + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(upper(v_task.input::text)) -- Convert input to uppercase + ); + end loop; + + -- Trigger consumer step + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify consumer receives both outputs correctly +select is( + (select input from pgflow_tests.read_and_start('test_mixed', 1, 1)), + jsonb_build_object( + 'run', jsonb_build_object('config', 'test'), + 'single_src', jsonb_build_object('processed', true, 'value', 100), + 'map_src', jsonb_build_array('"A"', '"B"', '"C"') -- Strings are JSON encoded + ), + 'Consumer should receive single output as object and map outputs as aggregated array' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/multiple_maps_to_single.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/multiple_maps_to_single.test.sql new file mode 100644 index 000000000..1b57ef019 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/multiple_maps_to_single.test.sql @@ -0,0 +1,113 @@ +begin; +select plan(2); + +-- Test: Multiple map steps feeding into single step +-- Verify single step receives multiple aggregated arrays correctly + +-- Setup: Use producer steps to create arrays for each map +-- (Can't have multiple root maps since flow input must be a single array) +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_multi_maps', 10, 60, 3); + +-- Create producers for each array +select pgflow.add_step('test_multi_maps', 'producer_a', '{}', null, null, null, null, 'single'); +select pgflow.add_step('test_multi_maps', 'producer_b', '{}', null, null, null, null, 'single'); + +-- Create dependent maps +select pgflow.add_step('test_multi_maps', 'map_a', array['producer_a'], null, null, null, null, 'map'); +select pgflow.add_step('test_multi_maps', 'map_b', array['producer_b'], null, null, null, null, 'map'); + +-- Create collector depending on both maps +select pgflow.add_step('test_multi_maps', 'collector', array['map_a', 'map_b'], null, null, null, null, 'single'); + +-- Start flow +select is( + (select count(*) from pgflow.start_flow('test_multi_maps', '{}'::jsonb)), + 1::bigint, + 'Flow should start successfully' +); + +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete producer_a (outputs 3-element array) + select * into v_task from pgflow_tests.read_and_start('test_multi_maps', 1, 1); + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + 0, + jsonb_build_array(10, 20, 30) + ); + + -- Complete producer_b (outputs 2-element array) + select * into v_task from pgflow_tests.read_and_start('test_multi_maps', 1, 1); + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + 0, + jsonb_build_array('alpha', 'beta') + ); + + -- Trigger map steps + perform pgflow.start_ready_steps(v_run_id); + + -- Complete map_a tasks (3 tasks) + for i in 1..3 loop + select * into v_task from pgflow_tests.read_and_start('test_multi_maps', 1, 1); + + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Transform by doubling + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb((v_task.input::int) * 2) + ); + end loop; + + -- Complete map_b tasks (2 tasks) + for i in 1..2 loop + select * into v_task from pgflow_tests.read_and_start('test_multi_maps', 1, 1); + + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Transform by uppercasing + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(upper(v_task.input#>>'{}')) + ); + end loop; + + -- Trigger collector + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify collector receives both aggregated arrays +select is( + (select input from pgflow_tests.read_and_start('test_multi_maps', 1, 1)), + jsonb_build_object( + 'run', '{}'::jsonb, + 'map_a', jsonb_build_array(20, 40, 60), + 'map_b', jsonb_build_array('ALPHA', 'BETA') + ), + 'Collector should receive both aggregated arrays from multiple map dependencies' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/null_outputs.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/null_outputs.test.sql new file mode 100644 index 000000000..ea8cf5108 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/null_outputs.test.sql @@ -0,0 +1,69 @@ +begin; +select plan(2); + +-- Test: Map tasks returning NULL outputs +-- NULL values should be preserved in aggregated array + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_nulls', 10, 60, 3); +select pgflow.add_step('test_nulls', 'map_with_nulls', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_nulls', 'consumer', array['map_with_nulls'], null, null, null, null, 'single'); + +-- Start flow with 5-item array +select is( + (select count(*) from pgflow.start_flow('test_nulls', '[1, 2, 3, 4, 5]'::jsonb)), + 1::bigint, + 'Flow should start with 5-element array' +); + +-- Complete map tasks with mix of nulls and values +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; + -- Define outputs: some null, some values + v_outputs jsonb[] := array[ + '42'::jsonb, -- task 0: number + 'null'::jsonb, -- task 1: NULL + '"text"'::jsonb, -- task 2: string + 'null'::jsonb, -- task 3: NULL + 'true'::jsonb -- task 4: boolean + ]; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete all 5 tasks + for i in 1..5 loop + select * into v_task from pgflow_tests.read_and_start('test_nulls', 1, 1); + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Complete with predefined output (including nulls) + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + v_outputs[v_task_index + 1] -- +1 for 1-indexed array + ); + end loop; + + -- Trigger dependent step + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify consumer receives array with nulls preserved +select is( + (select input->'map_with_nulls' from pgflow_tests.read_and_start('test_nulls', 1, 1)), + jsonb_build_array(42, null, 'text', null, true), + 'Aggregated array should preserve NULL values in correct positions' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/order_preservation.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/order_preservation.test.sql new file mode 100644 index 000000000..727a011ff --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/order_preservation.test.sql @@ -0,0 +1,66 @@ +begin; +select plan(2); + +-- Test: Order preservation in map output aggregation +-- Outputs should be aggregated in task_index order, not completion order + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_order', 10, 60, 3); +select pgflow.add_step('test_order', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_order', 'consumer', array['map_step'], null, null, null, null, 'single'); + +-- Start flow with 5-item array +select is( + (select count(*) from pgflow.start_flow('test_order', '["a", "b", "c", "d", "e"]'::jsonb)), + 1::bigint, + 'Flow should start successfully' +); + +-- Complete tasks in REVERSE order to test ordering +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_tasks pgflow.step_task_record[]; + i int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Read all 5 tasks and store them + for i in 1..5 loop + select * into v_task from pgflow_tests.read_and_start('test_order', 1, 1); + v_tasks := array_append(v_tasks, v_task); + end loop; + + -- Complete tasks in reverse order (4, 3, 2, 1, 0) + for i in reverse 5..1 loop + -- Complete with index as output (0-based) + perform pgflow.complete_task( + v_tasks[i].run_id, + v_tasks[i].step_slug, + i - 1, -- task_index is 0-based + jsonb_build_object('index', i - 1, 'letter', chr(96 + i)) -- {index: 0, letter: 'a'}, etc. + ); + end loop; + + -- Trigger dependent step + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Get the aggregated input for verification +select is( + (select input->'map_step' from pgflow_tests.read_and_start('test_order', 1, 1)), + jsonb_build_array( + jsonb_build_object('index', 0, 'letter', 'a'), + jsonb_build_object('index', 1, 'letter', 'b'), + jsonb_build_object('index', 2, 'letter', 'c'), + jsonb_build_object('index', 3, 'letter', 'd'), + jsonb_build_object('index', 4, 'letter', 'e') + ), + 'Map outputs should be ordered by task_index, not completion order' +); + + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/partial_completion_prevention.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/partial_completion_prevention.test.sql new file mode 100644 index 000000000..f79e3aa7e --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/partial_completion_prevention.test.sql @@ -0,0 +1,101 @@ +begin; +select plan(3); + +-- Test: Dependent steps don't start until ALL map tasks complete +-- Even if start_ready_steps is called, it should wait for all tasks + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_partial', 10, 60, 3); +select pgflow.add_step('test_partial', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_partial', 'dependent', array['map_step'], null, null, null, null, 'single'); + +-- Start flow with 5-element array +select is( + (select count(*) from pgflow.start_flow('test_partial', '[1, 2, 3, 4, 5]'::jsonb)), + 1::bigint, + 'Flow should start with 5-element array' +); + +-- Complete only 3 out of 5 tasks +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete only 3 tasks (not all 5) + for i in 1..3 loop + select * into v_task from pgflow_tests.read_and_start('test_partial', 1, 1); + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Complete task + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(v_task_index * 10) + ); + end loop; + + -- Try to trigger dependent step (should not start yet) + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify dependent step is NOT started +select is( + (select count(*) from pgflow.step_tasks where step_slug = 'dependent'), + 0::bigint, + 'Dependent step should NOT have tasks when map is partially complete' +); + +-- Now complete remaining tasks +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete remaining 2 tasks + for i in 1..2 loop + select * into v_task from pgflow_tests.read_and_start('test_partial', 1, 1); + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Complete task + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(v_task_index * 10) + ); + end loop; + + -- Now trigger dependent step (should start) + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify dependent step IS started after all map tasks complete +select is( + (select count(*) from pgflow.step_tasks where step_slug = 'dependent'), + 1::bigint, + 'Dependent step should have 1 task after all map tasks complete' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/run_completion_leaf_map.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/run_completion_leaf_map.test.sql new file mode 100644 index 000000000..e8a7ee3b8 --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/run_completion_leaf_map.test.sql @@ -0,0 +1,69 @@ +begin; +select plan(3); + +-- Test: Run completion with map step as leaf +-- The run output should contain aggregated map outputs + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_leaf', 10, 60, 3); +select pgflow.add_step('test_leaf', 'map_leaf', '{}', null, null, null, null, 'map'); + +-- Start flow with 3-item array +select is( + (select count(*) from pgflow.start_flow('test_leaf', '["first", "second", "third"]'::jsonb)), + 1::bigint, + 'Flow should start with 3-element array' +); + +-- Complete all map tasks +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; + v_task_index int; + v_inputs text[] := array['first', 'second', 'third']; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete all 3 tasks + for i in 1..3 loop + select * into v_task from pgflow_tests.read_and_start('test_leaf', 1, 1); + + -- Get task_index + select task_index into v_task_index + from pgflow.step_tasks + where run_id = v_task.run_id + and step_slug = v_task.step_slug + and message_id = v_task.msg_id; + + -- Complete with uppercase transformation + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + v_task_index, + to_jsonb(upper(v_inputs[v_task_index + 1])) + ); + end loop; + + -- Run should auto-complete when all leaf tasks complete + -- Call maybe_complete_run to ensure completion + perform pgflow.maybe_complete_run(v_run_id); +end $$; + +-- Check run is completed +select is( + (select status from pgflow.runs limit 1), + 'completed', + 'Run should be completed' +); + +-- Check run output contains aggregated map outputs +select is( + (select output->'map_leaf' from pgflow.runs limit 1), + jsonb_build_array('FIRST', 'SECOND', 'THIRD'), + 'Run output should contain aggregated map outputs' +); + +select * from finish(); +rollback; \ No newline at end of file diff --git a/pkgs/core/supabase/tests/map_output_aggregation/single_task_map.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/single_task_map.test.sql new file mode 100644 index 000000000..c81a706dd --- /dev/null +++ b/pkgs/core/supabase/tests/map_output_aggregation/single_task_map.test.sql @@ -0,0 +1,66 @@ +begin; +select plan(4); + +-- Test: Single task map (map with exactly 1 task) +-- Map with 1 task should produce array with one element + +-- Setup +select pgflow_tests.reset_db(); +select pgflow.create_flow('test_single_task', 10, 60, 3); +select pgflow.add_step('test_single_task', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.add_step('test_single_task', 'consumer', array['map_step'], null, null, null, null, 'single'); + +-- Start flow with single-element array +select is( + (select count(*) from pgflow.start_flow('test_single_task', '[42]'::jsonb)), + 1::bigint, + 'Flow should start with single-element array' +); + +-- Verify exactly 1 task was created for the map step +select is( + (select count(*) from pgflow.step_tasks where step_slug = 'map_step'), + 1::bigint, + 'Map step should have exactly 1 task' +); + +-- Complete the single map task +do $$ +declare + v_run_id uuid; + v_task pgflow.step_task_record; +begin + select run_id into v_run_id from pgflow.runs limit 1; + + -- Complete the single task + select * into v_task from pgflow_tests.read_and_start('test_single_task', 1, 1); + perform pgflow.complete_task( + v_task.run_id, + v_task.step_slug, + 0, -- task_index 0 (only task) + jsonb_build_object('processed', 42 * 2) -- output: {processed: 84} + ); + + -- Trigger dependent step + perform pgflow.start_ready_steps(v_run_id); +end $$; + +-- Verify that consumer receives single-element array +select is( + (select input->'map_step' from pgflow_tests.read_and_start('test_single_task', 1, 1)), + jsonb_build_array( + jsonb_build_object('processed', 84) + ), + 'Consumer should receive single-element array [output]' +); + +-- Verify map step is completed +select is( + (select status from pgflow.step_states + where step_slug = 'map_step'), + 'completed', + 'Map step with single task should complete successfully' +); + +select * from finish(); +rollback; \ No newline at end of file