diff --git a/.changeset/easy-bats-nail.md b/.changeset/easy-bats-nail.md new file mode 100644 index 000000000..780ae13a5 --- /dev/null +++ b/.changeset/easy-bats-nail.md @@ -0,0 +1,29 @@ +--- +'@pgflow/dsl': patch +--- + +Add `.map()` method to Flow DSL for defining map-type steps + +The new `.map()` method enables defining steps that process arrays element-by-element, complementing the existing SQL Core map infrastructure. Key features: + +- **Root maps**: Process flow input arrays directly by omitting the `array` property +- **Dependent maps**: Process another step's array output using `array: 'stepSlug'` +- **Type-safe**: Enforces Json-compatible types with full TypeScript inference +- **Compile-time duplicate slug detection**: TypeScript now prevents duplicate step slugs at compile-time +- **Different handler signature**: Receives individual items `(item, context)` instead of full input object +- **Always returns arrays**: Return type is `HandlerReturnType[]` +- **SQL generation**: Correctly adds `step_type => 'map'` parameter to `pgflow.add_step()` + +Example usage: + +```typescript +// Root map - processes array input +new Flow({ slug: 'process' }).map({ slug: 'uppercase' }, (item) => + item.toUpperCase() +); + +// Dependent map - processes another step's output +new Flow<{}>({ slug: 'workflow' }) + .array({ slug: 'items' }, () => [1, 2, 3]) + .map({ slug: 'double', array: 'items' }, (n) => n * 2); +``` diff --git a/.claude/settings.json b/.claude/settings.json index a80127af1..d31f097e1 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -23,6 +23,8 @@ "Bash(grep:*)", "Bash(ls:*)", "Bash(mkdir:*)", + "Bash(npx tsc:*)", + "Bash(pnpm tsc:*)", "Bash(pnpm eslint:*)", "Bash(pnpm nx build:*)", "Bash(pnpm nx lint:*)", diff --git a/PLAN.md b/PLAN.md index 42856992b..a4c74d8f9 100644 --- a/PLAN.md +++ b/PLAN.md @@ -9,14 +9,17 @@ - ✅ **DONE**: Dependency count propagation for map steps - ✅ **DONE**: Array element extraction - tasks receive individual array elements - ✅ **DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents -- ⏳ **NEXT**: DSL support for `.map()` for defining map steps +- ✅ **DONE**: DSL support for `.map()` for defining map steps with compile-time duplicate detection +- ⏳ **TODO**: Fix orphaned messages on run failure +- ⏳ **TODO**: Performance optimization with step_states.output column ### Chores -- ⏳ **WAITING**: Integration tests for map steps -- ⏳ **WAITING**: Consolidated migration for map steps -- ⏳ **WAITING**: Documentation for map steps -- ⏳ **WAITING**: Graphite stack merge for map steps +- ⏳ **TODO**: Integration tests for map steps +- ⏳ **TODO**: Update core README +- ⏳ **TODO**: Add docs page for array and map steps +- ⏳ **TODO**: Migration consolidation +- ⏳ **TODO**: Graphite stack merge ## Implementation Status @@ -83,33 +86,38 @@ - Validates non-array outputs to map steps fail correctly - Fixed broadcast aggregation to send full array not individual task output -#### ❌ Remaining Work +- [x] **PR #218: DSL Support for .map() Step Type** - `09-18-add-map-support-to-dsl` ✅ COMPLETED -- [ ] **DSL Support for .map() Step Type** - - - Add `.map()` method to Flow DSL for defining map steps + - Added `.map()` method to Flow DSL for defining map steps - Constraints: - Locked to exactly one dependency (enforced at compile time) - Dependency must return an array (type-checked) - Syntax design: - Dependent maps: `flow.map({ slug: 'stepName', array: 'arrayReturningStep' }, handler)` - - Root maps: Decide between `{ array: 'run' }` or omitting array property + - Root maps: Omit array property - Return type always inferred as array - Comprehensive tests: - Runtime validation of array dependencies - Type safety for input/output types - Compile-time enforcement of single dependency rule + - Fixed complex TypeScript type inference issue with overloads + - Added compile-time duplicate slug detection across all DSL methods + - Fixed all linting errors (replaced `{}` with `Record`) + - Updated DSL README with .map() documentation + - Created detailed changeset + +#### ❌ Remaining Work (Priority Order) -- [ ] **Fix Orphaned Messages on Run Failure** +- [ ] **Priority 1: Fix Orphaned Messages on Run Failure** 🚨 CRITICAL - Archive all pending messages when run fails - Handle map sibling tasks specially - Fix type constraint violations to fail immediately without retries - See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md) - - Critical for production: prevents queue performance degradation + - **Critical for production: prevents queue performance degradation** - Tests already written (stashed) that document the problem -- [ ] **Performance Optimization: step_states.output Column** +- [ ] **Priority 2: Performance Optimization - step_states.output Column** - Migrate from inline aggregation to storing outputs in step_states - See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md) @@ -124,20 +132,13 @@ - Update all aggregation tests (~17 files) - **Note**: This is an optimization that should be done after core functionality is stable -- [ ] **Integration Tests** +- [ ] **Priority 3: Integration Tests** - End-to-end workflows with real array data - Basic happy path coverage - This should be minimal and added to the Edge Worker integration test suite for now -- [ ] **Migration Consolidation** - - - Remove all temporary/incremental migrations from feature branches - - Generate a single consolidated migration for the entire map infrastructure - - Ensure clean migration path from current production schema - - If NULL improvement is done, include it in the consolidated migration - -- [ ] **Update README's** and **Docs** +- [ ] **Priority 4: Update core README** - `pkgs/core/README.md` @@ -149,12 +150,7 @@ - Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input - Explain cascade completion of taskless steps and its limitations - - `pkgs/dsl/README.md` - - - Briefly describe the new `.array()` and `.map()` methods - - Mention `.array` is mostly sugar, and `.map` is the new step type - - Link to `pkgs/core/README.md` sections for more explanation about map steps - - Make sure to mention that maps are constrained to have exactly one dependency +- [ ] **Priority 5: Add docs page** - **Add basic docs page** @@ -165,7 +161,14 @@ - focus mostly on how to use it, instead of how it works under the hood - link to the README's for more details -- [ ] **Graphite Stack Merge** +- [ ] **Priority 6: Migration Consolidation** (Do this last before merge!) + + - Remove all temporary/incremental migrations from feature branches + - Generate a single consolidated migration for the entire map infrastructure + - Ensure clean migration path from current production schema + - If NULL improvement is done, include it in the consolidated migration + +- [ ] **Priority 7: Graphite Stack Merge** - Configure Graphite merge queue for the complete PR stack - Ensure all PRs in sequence can be merged together diff --git a/pkgs/dsl/README.md b/pkgs/dsl/README.md index be5a111b1..7e1ad1f26 100644 --- a/pkgs/dsl/README.md +++ b/pkgs/dsl/README.md @@ -73,6 +73,78 @@ This design ensures: - Data doesn't need to be manually forwarded through intermediate steps - Steps can combine original input with processed data from previous steps +### Step Methods + +The Flow DSL provides three methods for defining steps in your workflow: + +#### `.step()` - Regular Steps + +The standard method for adding steps to a flow. Each step processes input and returns output. + +```typescript +.step( + { slug: 'process', dependsOn: ['previous'] }, + async (input) => { + // Access input.run and input.previous + return { result: 'processed' }; + } +) +``` + +#### `.array()` - Array-Returning Steps + +A semantic wrapper around `.step()` that provides type enforcement for steps that return arrays. Useful for data fetching or collection steps. + +```typescript +.array( + { slug: 'fetch_items' }, + async () => [1, 2, 3, 4, 5] +) +``` + +#### `.map()` - Array Processing Steps + +Processes arrays element-by-element, similar to JavaScript's `Array.map()`. The handler receives individual items instead of the full input object. + +```typescript +// Root map - processes flow input array +new Flow({ slug: 'process_strings' }) + .map({ slug: 'uppercase' }, (item) => item.toUpperCase()); + +// Dependent map - processes another step's output +new Flow<{}>({ slug: 'data_pipeline' }) + .array({ slug: 'numbers' }, () => [1, 2, 3]) + .map({ slug: 'double', array: 'numbers' }, (n) => n * 2) + .map({ slug: 'square', array: 'double' }, (n) => n * n); +``` + +**Key differences from regular steps:** +- Uses `array:` instead of `dependsOn:` for specifying the single array dependency +- Handler signature is `(item, context) => result` instead of `(input, context) => result` +- Return type is always an array +- Generates SQL with `step_type => 'map'` parameter for pgflow's map processing + +**Type Safety:** +The `.map()` method provides full TypeScript type inference for array elements: + +```typescript +type User = { id: number; name: string }; + +new Flow<{}>({ slug: 'user_flow' }) + .array({ slug: 'users' }, (): User[] => [ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' } + ]) + .map({ slug: 'greet', array: 'users' }, (user) => { + // TypeScript knows user is of type User + return `Hello, ${user.name} (ID: ${user.id})`; + }); +``` + +**Limitations:** +- Can only depend on a single array-returning step +- TypeScript may not track type transformations between chained maps (use type assertions if needed) + ### Context Object Step handlers can optionally receive a second parameter - the **context object** - which provides access to platform resources and runtime information. @@ -114,6 +186,11 @@ All platforms provide these core resources: msg_id: number; } ``` +- **`context.workerConfig`** - Resolved worker configuration with all defaults applied + ```typescript + // Provides access to worker settings like retry limits + const isLastAttempt = context.rawMessage.read_ct >= context.workerConfig.retry.limit; + ``` #### Supabase Platform Resources diff --git a/pkgs/dsl/__tests__/integration/map-flow.test.ts b/pkgs/dsl/__tests__/integration/map-flow.test.ts new file mode 100644 index 000000000..eab37972d --- /dev/null +++ b/pkgs/dsl/__tests__/integration/map-flow.test.ts @@ -0,0 +1,208 @@ +import { describe, it, expect } from 'vitest'; +import { Flow } from '../../src/dsl.js'; +import { compileFlow } from '../../src/compile-flow.js'; + +describe('Map flow integration tests', () => { + describe('complete flow examples', () => { + it('should compile a data processing pipeline with maps', () => { + // Simulating a real-world data processing flow + const flow = new Flow({ slug: 'data_processing' }) + .map({ slug: 'normalize' }, (item) => item.trim().toLowerCase()) + .map({ slug: 'validate', array: 'normalize' }, (item) => { + // Validate each normalized item + return item.length > 0 && item.length < 100; + }) + .step({ slug: 'summarize', dependsOn: ['validate'] }, (input) => ({ + total: input.validate.length, + valid: input.validate.filter(v => v).length, + invalid: input.validate.filter(v => !v).length + })); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(4); + expect(sql[0]).toBe("SELECT pgflow.create_flow('data_processing');"); + expect(sql[1]).toBe("SELECT pgflow.add_step('data_processing', 'normalize', step_type => 'map');"); + expect(sql[2]).toBe("SELECT pgflow.add_step('data_processing', 'validate', ARRAY['normalize'], step_type => 'map');"); + expect(sql[3]).toBe("SELECT pgflow.add_step('data_processing', 'summarize', ARRAY['validate']);"); + }); + + it('should compile an ETL flow with array generation and mapping', () => { + const flow = new Flow<{ sourceIds: string[] }>({ slug: 'etl_flow' }) + .array({ slug: 'fetch_data' }, async ({ run }) => { + // Simulating fetching data for each source ID + return run.sourceIds.map(id => ({ id, data: `data_${id}` })); + }) + .map({ slug: 'transform', array: 'fetch_data' }, (record) => ({ + ...record, + transformed: record.data.toUpperCase(), + timestamp: Date.now() + })) + .map({ slug: 'enrich', array: 'transform' }, async (record) => ({ + ...record, + enriched: true, + metadata: { processedAt: new Date().toISOString() } + })) + .step({ slug: 'load', dependsOn: ['enrich'] }, async (input) => { + // Final loading step + return { + recordsProcessed: input.enrich.length, + success: true + }; + }); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(5); + expect(sql[1]).not.toContain("step_type"); // array step + expect(sql[2]).toContain("step_type => 'map'"); + expect(sql[3]).toContain("step_type => 'map'"); + expect(sql[4]).not.toContain("step_type"); // regular step + }); + + it('should handle complex nested array processing', () => { + // Flow that processes nested arrays (e.g., matrix operations) + const flow = new Flow({ slug: 'matrix_flow' }) + .map({ slug: 'row_sums' }, (row) => row.reduce((a, b) => a + b, 0)) + .step({ slug: 'total_sum', dependsOn: ['row_sums'] }, (input) => + input.row_sums.reduce((a, b) => a + b, 0) + ); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(3); + expect(sql[1]).toBe("SELECT pgflow.add_step('matrix_flow', 'row_sums', step_type => 'map');"); + expect(sql[2]).toBe("SELECT pgflow.add_step('matrix_flow', 'total_sum', ARRAY['row_sums']);"); + }); + }); + + describe('runtime validation', () => { + it('should throw when trying to use non-existent step as array dependency', () => { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'exists' }, () => [1, 2, 3]); + + expect(() => { + // @ts-expect-error - TypeScript should catch this at compile time + flow.map({ slug: 'fail', array: 'doesNotExist' }, (item) => item); + }).toThrow('Step "fail" depends on undefined step "doesNotExist"'); + }); + + it('should throw when step slug already exists', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'process' }, (n) => n * 2); + + expect(() => { + flow.map({ slug: 'process' }, (n) => n * 3); + }).toThrow('Step "process" already exists in flow "test"'); + }); + + it('should validate slug format', () => { + expect(() => { + new Flow({ slug: 'test' }) + .map({ slug: 'invalid-slug!' }, (n) => n); + }).toThrow(); // validateSlug should reject invalid characters + }); + + it('should validate runtime options', () => { + // This should not throw - valid options + const validFlow = new Flow({ slug: 'test' }) + .map({ + slug: 'valid', + maxAttempts: 3, + baseDelay: 1000, + timeout: 30000, + startDelay: 5000 + }, (n) => n); + + expect(compileFlow(validFlow)).toHaveLength(2); + + // Invalid options should be caught by validateRuntimeOptions + expect(() => { + new Flow({ slug: 'test' }) + .map({ + slug: 'invalid', + maxAttempts: 0 // Should be >= 1 + }, (n) => n); + }).toThrow(); + }); + }); + + describe('type inference validation', () => { + it('should correctly infer types through map chains', () => { + const flow = new Flow<{ items: string[] }>({ slug: 'test' }) + .step({ slug: 'extract', dependsOn: [] }, ({ run }) => run.items) + .map({ slug: 'lengths', array: 'extract' }, (item) => item.length) + .map({ slug: 'doubles', array: 'lengths' }, (len) => len * 2) + .step({ slug: 'sum', dependsOn: ['doubles'] }, (input) => { + // Type checking - this should compile without errors + const total: number = input.doubles.reduce((a, b) => a + b, 0); + return total; + }); + + const sql = compileFlow(flow); + expect(sql).toHaveLength(5); + }); + }); + + describe('edge cases', () => { + it('should handle empty array processing', () => { + const flow = new Flow({ slug: 'empty_test' }) + .map({ slug: 'process' }, (item) => ({ processed: item })); + + const sql = compileFlow(flow); + expect(sql).toHaveLength(2); + expect(sql[1]).toContain("step_type => 'map'"); + }); + + it('should handle all runtime options combinations', () => { + const flow = new Flow({ slug: 'options_test' }) + .map({ slug: 'no_options' }, (s) => s) + .map({ slug: 'some_options', array: 'no_options', maxAttempts: 5 }, (s) => s) + .map({ + slug: 'all_options', + array: 'some_options', + maxAttempts: 3, + baseDelay: 1000, + timeout: 30000, + startDelay: 5000 + }, (s) => s); + + const sql = compileFlow(flow); + + expect(sql[1]).toBe("SELECT pgflow.add_step('options_test', 'no_options', step_type => 'map');"); + expect(sql[2]).toBe("SELECT pgflow.add_step('options_test', 'some_options', ARRAY['no_options'], max_attempts => 5, step_type => 'map');"); + expect(sql[3]).toContain("max_attempts => 3"); + expect(sql[3]).toContain("base_delay => 1000"); + expect(sql[3]).toContain("timeout => 30000"); + expect(sql[3]).toContain("start_delay => 5000"); + expect(sql[3]).toContain("step_type => 'map'"); + }); + + it('should handle map steps with no further dependencies', () => { + // Map step as a leaf node + const flow = new Flow({ slug: 'leaf_map' }) + .map({ slug: 'final_map' }, (n) => n * n); + + const sql = compileFlow(flow); + expect(sql).toHaveLength(2); + expect(sql[1]).toBe("SELECT pgflow.add_step('leaf_map', 'final_map', step_type => 'map');"); + }); + + it('should handle multiple independent map chains', () => { + const flow = new Flow<{ a: number[]; b: string[] }>({ slug: 'parallel' }) + .step({ slug: 'extract_a' }, ({ run }) => run.a) + .step({ slug: 'extract_b' }, ({ run }) => run.b) + .map({ slug: 'process_a', array: 'extract_a' }, (n) => n * 2) + .map({ slug: 'process_b', array: 'extract_b' }, (s) => s.toUpperCase()) + .step({ slug: 'combine', dependsOn: ['process_a', 'process_b'] }, (input) => ({ + numbers: input.process_a, + strings: input.process_b + })); + + const sql = compileFlow(flow); + expect(sql).toHaveLength(6); + expect(sql[3]).toContain("step_type => 'map'"); + expect(sql[4]).toContain("step_type => 'map'"); + }); + }); +}); \ No newline at end of file diff --git a/pkgs/dsl/__tests__/runtime/map-compile.test.ts b/pkgs/dsl/__tests__/runtime/map-compile.test.ts new file mode 100644 index 000000000..66bb755f7 --- /dev/null +++ b/pkgs/dsl/__tests__/runtime/map-compile.test.ts @@ -0,0 +1,210 @@ +import { describe, it, expect } from 'vitest'; +import { Flow } from '../../src/dsl.js'; +import { compileFlow } from '../../src/compile-flow.js'; + +describe('compileFlow with map steps', () => { + describe('root map compilation', () => { + it('should compile root map with step_type parameter', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ slug: 'square' }, (n) => n * n); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(2); + expect(sql[0]).toBe("SELECT pgflow.create_flow('test_flow');"); + expect(sql[1]).toContain("step_type => 'map'"); + expect(sql[1]).not.toContain("ARRAY["); // No dependencies for root map + expect(sql[1]).toBe("SELECT pgflow.add_step('test_flow', 'square', step_type => 'map');"); + }); + + it('should compile root map with runtime options', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ + slug: 'process', + maxAttempts: 5, + baseDelay: 10, + timeout: 60 + }, (item) => item.toUpperCase()); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(2); + expect(sql[1]).toContain("step_type => 'map'"); + expect(sql[1]).toContain("max_attempts => 5"); + expect(sql[1]).toContain("base_delay => 10"); + expect(sql[1]).toContain("timeout => 60"); + expect(sql[1]).toBe( + "SELECT pgflow.add_step('test_flow', 'process', max_attempts => 5, base_delay => 10, timeout => 60, step_type => 'map');" + ); + }); + + it('should compile root map with startDelay option', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ + slug: 'delayed', + startDelay: 300 + }, (item) => item.length); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(2); + expect(sql[1]).toContain("start_delay => 300"); + expect(sql[1]).toContain("step_type => 'map'"); + }); + }); + + describe('dependent map compilation', () => { + it('should compile dependent map with array dependency', () => { + const flow = new Flow<{ count: number }>({ slug: 'test_flow' }) + .array({ slug: 'nums' }, ({ run }) => Array(run.count).fill(0).map((_, i) => i)) + .map({ slug: 'double', array: 'nums' }, (n) => n * 2); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(3); + expect(sql[0]).toBe("SELECT pgflow.create_flow('test_flow');"); + expect(sql[1]).toBe("SELECT pgflow.add_step('test_flow', 'nums');"); + expect(sql[2]).toContain("ARRAY['nums']"); + expect(sql[2]).toContain("step_type => 'map'"); + expect(sql[2]).toBe("SELECT pgflow.add_step('test_flow', 'double', ARRAY['nums'], step_type => 'map');"); + }); + + it('should compile dependent map with options', () => { + const flow = new Flow>({ slug: 'test_flow' }) + .step({ slug: 'fetch' }, () => ['a', 'b', 'c']) + .map({ + slug: 'process', + array: 'fetch', + maxAttempts: 3, + timeout: 30 + }, (item) => ({ processed: item })); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(3); + expect(sql[2]).toContain("ARRAY['fetch']"); + expect(sql[2]).toContain("step_type => 'map'"); + expect(sql[2]).toContain("max_attempts => 3"); + expect(sql[2]).toContain("timeout => 30"); + }); + }); + + describe('mixed step types', () => { + it('should compile flow with map and regular steps', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ slug: 'double' }, (n) => n * 2) + .step({ slug: 'sum', dependsOn: ['double'] }, (input) => + input.double.reduce((a, b) => a + b, 0) + ); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(3); + expect(sql[1]).toContain("step_type => 'map'"); + expect(sql[2]).not.toContain("step_type"); // Regular step doesn't need step_type + expect(sql[2]).toContain("ARRAY['double']"); + }); + + it('should compile map chaining', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ slug: 'uppercase' }, (s) => s.toUpperCase()) + .map({ slug: 'lengths', array: 'uppercase' }, (s) => s.length); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(3); + expect(sql[1]).toContain("step_type => 'map'"); + expect(sql[1]).not.toContain("ARRAY["); // Root map + expect(sql[2]).toContain("step_type => 'map'"); + expect(sql[2]).toContain("ARRAY['uppercase']"); + }); + + it('should compile array to map to step chain', () => { + const flow = new Flow>({ slug: 'test_flow' }) + .array({ slug: 'generate' }, () => [1, 2, 3]) + .map({ slug: 'square', array: 'generate' }, (n) => n * n) + .step({ slug: 'total', dependsOn: ['square'] }, (input) => ({ + sum: input.square.reduce((a, b) => a + b, 0) + })); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(4); + expect(sql[1]).not.toContain("step_type"); // Array step + expect(sql[2]).toContain("step_type => 'map'"); + expect(sql[2]).toContain("ARRAY['generate']"); + expect(sql[3]).not.toContain("step_type"); // Regular step + expect(sql[3]).toContain("ARRAY['square']"); + }); + }); + + describe('flow with only map steps', () => { + it('should compile flow with only root map', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ slug: 'process' }, (s) => s.toUpperCase()); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(2); + expect(sql[1]).toBe("SELECT pgflow.add_step('test_flow', 'process', step_type => 'map');"); + }); + + it('should compile flow with multiple map steps', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ slug: 'double' }, (n) => n * 2) + .map({ slug: 'square', array: 'double' }, (n) => n * n) + .map({ slug: 'stringify', array: 'square' }, (n) => String(n)); + + const sql = compileFlow(flow); + + expect(sql).toHaveLength(4); + expect(sql[1]).toContain("step_type => 'map'"); + expect(sql[1]).not.toContain("ARRAY["); + expect(sql[2]).toContain("step_type => 'map'"); + expect(sql[2]).toContain("ARRAY['double']"); + expect(sql[3]).toContain("step_type => 'map'"); + expect(sql[3]).toContain("ARRAY['square']"); + }); + }); + + describe('parameter ordering', () => { + it('should place step_type after runtime options', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map({ + slug: 'process', + maxAttempts: 5, + baseDelay: 10, + timeout: 60, + startDelay: 300 + }, (s) => s.length); + + const sql = compileFlow(flow); + + // Ensure step_type comes after all other options + const expectedOrder = + "SELECT pgflow.add_step('test_flow', 'process', " + + "max_attempts => 5, base_delay => 10, timeout => 60, start_delay => 300, " + + "step_type => 'map');"; + + expect(sql[1]).toBe(expectedOrder); + }); + + it('should handle dependencies and step_type correctly', () => { + const flow = new Flow>({ slug: 'test_flow' }) + .array({ slug: 'items' }, () => ['a', 'b']) + .map({ + slug: 'process', + array: 'items', + maxAttempts: 3 + }, (s) => s.toUpperCase()); + + const sql = compileFlow(flow); + + const expected = + "SELECT pgflow.add_step('test_flow', 'process', ARRAY['items'], " + + "max_attempts => 3, step_type => 'map');"; + + expect(sql[2]).toBe(expected); + }); + }); +}); \ No newline at end of file diff --git a/pkgs/dsl/__tests__/runtime/map-type-inference.test.ts b/pkgs/dsl/__tests__/runtime/map-type-inference.test.ts new file mode 100644 index 000000000..135fd949f --- /dev/null +++ b/pkgs/dsl/__tests__/runtime/map-type-inference.test.ts @@ -0,0 +1,51 @@ +import { describe, it, expect } from 'vitest'; +import { Flow } from '../../src/dsl.js'; + +describe('Map type inference runtime verification', () => { + it('should infer types correctly for root maps', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'process' }, (item) => { + // Verify we can use string methods + return item.toUpperCase(); + }); + + // If this compiles, type inference works + expect(flow.slug).toBe('test'); + }); + + it('should infer types correctly for array-dependent maps', () => { + const flow = new Flow>({ slug: 'test' }) + .array({ slug: 'nums' }, () => [1, 2, 3]) + .map({ slug: 'double', array: 'nums' }, (item) => { + // Verify we can use number operations + return item * 2; + }); + + expect(flow.slug).toBe('test'); + }); + + it('should infer types correctly for step-dependent maps', () => { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'getStrings' }, () => ['a', 'b', 'c']) + .map({ slug: 'upper', array: 'getStrings' }, (item) => { + // Verify we can use string methods + return item.toUpperCase(); + }); + + expect(flow.slug).toBe('test'); + }); + + it('should infer types for complex objects', () => { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'users' }, () => [ + { id: 1, name: 'Alice', age: 30 }, + { id: 2, name: 'Bob', age: 25 } + ]) + .map({ slug: 'names', array: 'users' }, (user) => { + // Verify we can access object properties + return { userId: user.id, userName: user.name }; + }); + + expect(flow.slug).toBe('test'); + }); +}); \ No newline at end of file diff --git a/pkgs/dsl/__tests__/types/duplicate-slug-detection.test-d.ts b/pkgs/dsl/__tests__/types/duplicate-slug-detection.test-d.ts new file mode 100644 index 000000000..2efc3cdec --- /dev/null +++ b/pkgs/dsl/__tests__/types/duplicate-slug-detection.test-d.ts @@ -0,0 +1,143 @@ +/** + * Type tests for compile-time duplicate slug detection + */ + +import { describe, it } from 'vitest'; +import { Flow } from '../../src/dsl.js'; + +describe('Duplicate slug compile-time detection', () => { + describe('step method', () => { + it('should prevent duplicate slugs at compile time', () => { + try { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'first' }, () => ({ data: 'test' })); + + // @ts-expect-error - Should not allow duplicate slug 'first' + flow.step({ slug: 'first' }, () => ({ other: 'data' })); + + // Valid - different slug + flow.step({ slug: 'second' }, () => ({ other: 'data' })); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + + it('should work across multiple steps', () => { + try { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'step1' }, () => ({ a: 1 })) + .step({ slug: 'step2' }, () => ({ b: 2 })) + .step({ slug: 'step3' }, () => ({ c: 3 })); + + // @ts-expect-error - Cannot reuse step1 + flow.step({ slug: 'step1' }, () => ({ d: 4 })); + + // @ts-expect-error - Cannot reuse step2 + flow.step({ slug: 'step2' }, () => ({ e: 5 })); + + // @ts-expect-error - Cannot reuse step3 + flow.step({ slug: 'step3' }, () => ({ f: 6 })); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + }); + + describe('array method', () => { + it('should prevent duplicate slugs at compile time', () => { + try { + const flow = new Flow>({ slug: 'test' }) + .array({ slug: 'items' }, () => [1, 2, 3]); + + // @ts-expect-error - Should not allow duplicate slug 'items' + flow.array({ slug: 'items' }, () => ['a', 'b', 'c']); + + // Valid - different slug + flow.array({ slug: 'other' }, () => ['a', 'b', 'c']); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + + it('should prevent reusing slugs from step method', () => { + try { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'process' }, () => ({ result: true })); + + // @ts-expect-error - Cannot reuse slug from step + flow.array({ slug: 'process' }, () => [1, 2, 3]); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + }); + + describe('map method', () => { + it('should prevent duplicate slugs for root maps', () => { + try { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'double' }, (n) => n * 2); + + // @ts-expect-error - Should not allow duplicate slug 'double' + flow.map({ slug: 'double' }, (n) => n * 3); + + // Valid - different slug + flow.map({ slug: 'triple' }, (n) => n * 3); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + + it('should prevent duplicate slugs for dependent maps', () => { + try { + const flow = new Flow>({ slug: 'test' }) + .array({ slug: 'numbers' }, () => [1, 2, 3]) + .map({ slug: 'process', array: 'numbers' }, (n) => n * 2); + + // @ts-expect-error - Should not allow duplicate slug 'process' + flow.map({ slug: 'process', array: 'numbers' }, (n) => n * 3); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + + it('should prevent reusing slugs across different method types', () => { + try { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'fetch' }, () => ({ data: [1, 2, 3] })) + .array({ slug: 'items' }, () => ['a', 'b', 'c']); + + // @ts-expect-error - Cannot reuse 'fetch' from step + flow.map({ slug: 'fetch', array: 'items' }, (item) => item); + + // @ts-expect-error - Cannot reuse 'items' from array + flow.map({ slug: 'items', array: 'items' }, (item) => item); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + }); + + describe('mixed methods', () => { + it('should track slugs across all method types', () => { + try { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'map1' }, (n) => n * 2) + .step({ slug: 'step1' }, () => ({ result: 'test' })) + .array({ slug: 'array1' }, () => [1, 2, 3]); + + // All these should error - slugs already used + // @ts-expect-error - map1 already used + flow.step({ slug: 'map1' }, () => ({})); + + // @ts-expect-error - step1 already used + flow.array({ slug: 'step1' }, () => []); + + // @ts-expect-error - array1 already used + flow.map({ slug: 'array1', array: 'array1' }, (n) => n); + } catch { + // Runtime errors are expected, we're testing compile-time type checking + } + }); + }); +}); \ No newline at end of file diff --git a/pkgs/dsl/__tests__/types/map-method.test-d.ts b/pkgs/dsl/__tests__/types/map-method.test-d.ts new file mode 100644 index 000000000..25d839bf7 --- /dev/null +++ b/pkgs/dsl/__tests__/types/map-method.test-d.ts @@ -0,0 +1,300 @@ +import { Flow, type Json, type StepInput, type ExtractFlowContext } from '../../src/index.js'; +import { describe, it, expectTypeOf } from 'vitest'; + +describe('.map() method type constraints', () => { + describe('root map - flow input is array', () => { + it('should accept root map when flow input is array', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'process' }, (item) => { + expectTypeOf(item).toEqualTypeOf(); + return { processed: item.toUpperCase() }; + }); + + // The map step should return an array of the handler return type + type ProcessOutput = typeof flow extends Flow + ? Steps['process'] + : never; + expectTypeOf().toEqualTypeOf<{ processed: string }[]>(); + }); + + it('should reject root map when flow input is not array', () => { + // @ts-expect-error - Flow input must be array for root map + new Flow({ slug: 'test' }) + .map({ slug: 'fail' }, (item) => item); + + // @ts-expect-error - Object is not an array + new Flow<{ name: string }>({ slug: 'test' }) + .map({ slug: 'fail2' }, (item) => item); + }); + + it('should correctly type item for nested array input', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'flatten' }, (item) => { + expectTypeOf(item).toEqualTypeOf(); + return item.length; + }); + + type FlattenOutput = typeof flow extends Flow + ? Steps['flatten'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + + it('should work with Json array types', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'stringify' }, (item) => { + expectTypeOf(item).toEqualTypeOf(); + return String(item); + }); + + type StringifyOutput = typeof flow extends Flow + ? Steps['stringify'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + }); + + describe('dependent map - array from another step', () => { + it('should accept dependent map when dependency returns array', () => { + const flow = new Flow<{ count: number }>({ slug: 'test' }) + .array({ slug: 'numbers' }, ({ run }) => + Array(run.count).fill(0).map((_, i) => i) + ) + .map({ slug: 'double', array: 'numbers' }, (item) => { + expectTypeOf(item).toEqualTypeOf(); + return item * 2; + }); + + type DoubleOutput = typeof flow extends Flow + ? Steps['double'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + + it('should reject dependent map when dependency returns non-array', () => { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'notArray' }, () => 'string'); + + // @ts-expect-error - dependency must return array + flow.map({ slug: 'fail', array: 'notArray' }, (item) => item); + }); + + it('should reject non-existent dependencies', () => { + const flow = new Flow>({ slug: 'test' }) + .array({ slug: 'items' }, () => [1, 2, 3]); + + // This test verifies TypeScript compile-time checking + // The @ts-expect-error comment verifies the type error + // We wrap in try-catch to handle runtime validation + try { + // @ts-expect-error - 'nonExistent' is not a valid step + flow.map({ slug: 'fail', array: 'nonExistent' }, (item) => item); + } catch (error) { + // Runtime validation also catches this - expected behavior + } + }); + + it('should handle complex object arrays', () => { + const flow = new Flow>({ slug: 'test' }) + .step({ slug: 'fetch' }, () => [ + { id: 1, name: 'Alice', age: 30 }, + { id: 2, name: 'Bob', age: 25 } + ]) + .map({ slug: 'extractNames', array: 'fetch' }, (user) => { + expectTypeOf(user).toEqualTypeOf<{ id: number; name: string; age: number }>(); + return user.name; + }); + + type NamesOutput = typeof flow extends Flow + ? Steps['extractNames'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + }); + + describe('handler return type enforcement', () => { + it('should enforce Json return type', () => { + new Flow({ slug: 'test' }) + .map({ slug: 'valid1' }, () => 'string') + .map({ slug: 'valid2' }, () => 123) + .map({ slug: 'valid3' }, () => true) + .map({ slug: 'valid4' }, () => null) + .map({ slug: 'valid5' }, () => ({ key: 'value' })) + .map({ slug: 'valid6' }, () => [1, 2, 3]); + + // These should fail type checking + new Flow({ slug: 'test' }) + // @ts-expect-error - undefined is not Json + .map({ slug: 'invalid1' }, () => undefined) + // @ts-expect-error - symbol is not Json + .map({ slug: 'invalid2' }, () => Symbol('test')) + // @ts-expect-error - function is not Json + .map({ slug: 'invalid3' }, () => (() => 'function')); + }); + + it('should handle async handlers returning Json', () => { + new Flow({ slug: 'test' }) + .map({ slug: 'async1' }, async (n) => n * 2) + .map({ slug: 'async2' }, async (n) => ({ value: n })) + .map({ slug: 'async3' }, async (n) => `number: ${n}`); + }); + }); + + describe('map chaining', () => { + it('should allow map to map chaining', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'uppercase' }, (item) => item.toUpperCase()) + .map({ slug: 'lengths', array: 'uppercase' }, (item) => { + expectTypeOf(item).toEqualTypeOf(); + return item.length; + }); + + type LengthsOutput = typeof flow extends Flow + ? Steps['lengths'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + + it('should allow regular step to depend on map output', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'double' }, (n) => n * 2) + .step({ slug: 'sum', dependsOn: ['double'] }, (input) => { + expectTypeOf(input.double).toEqualTypeOf(); + return input.double.reduce((a, b) => a + b, 0); + }); + + type SumOutput = typeof flow extends Flow + ? Steps['sum'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + + it('should allow array step to provide input for map', () => { + const flow = new Flow>({ slug: 'test' }) + .array({ slug: 'generate' }, () => ['a', 'b', 'c']) + .map({ slug: 'process', array: 'generate' }, (letter) => { + expectTypeOf(letter).toEqualTypeOf(); + return { letter, index: letter.charCodeAt(0) }; + }); + + type ProcessOutput = typeof flow extends Flow + ? Steps['process'] + : never; + expectTypeOf().toEqualTypeOf<{ letter: string; index: number }[]>(); + }); + }); + + describe('context inference', () => { + it('should preserve context through map methods', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'process' }, (item, context: { api: { transform: (s: string) => string } }) => { + expectTypeOf(context.api.transform).toEqualTypeOf<(s: string) => string>(); + expectTypeOf(context.env).toEqualTypeOf>(); + expectTypeOf(context.shutdownSignal).toEqualTypeOf(); + return context.api.transform(item); + }); + + type FlowContext = ExtractFlowContext; + expectTypeOf().toMatchTypeOf<{ + env: Record; + shutdownSignal: AbortSignal; + api: { transform: (s: string) => string }; + }>(); + }); + + it('should accumulate context across map and regular steps', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'transform' }, (n, context: { multiplier: number }) => n * context.multiplier) + .step({ slug: 'aggregate' }, (input, context: { formatter: (n: number) => string }) => + context.formatter(input.transform.reduce((a, b) => a + b, 0)) + ); + + type FlowContext = ExtractFlowContext; + expectTypeOf().toMatchTypeOf<{ + env: Record; + shutdownSignal: AbortSignal; + multiplier: number; + formatter: (n: number) => string; + }>(); + }); + }); + + describe('StepInput utility type', () => { + it('should not include input for map steps', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'process' }, (item) => item.toUpperCase()) + .step({ slug: 'count', dependsOn: ['process'] }, (input) => input.process.length); + + // Map steps don't have StepInput in the traditional sense + // They receive individual items, not the full input object + type ProcessInput = StepInput; + // This should probably be never or a special type for map steps + expectTypeOf().toMatchTypeOf<{ run: string[] }>(); + + type CountInput = StepInput; + expectTypeOf().toMatchTypeOf<{ + run: string[]; + process: string[]; + }>(); + }); + }); + + describe('getStepDefinition compatibility', () => { + it('should correctly type map step definitions', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'square' }, (n) => n * n) + .step({ slug: 'sum', dependsOn: ['square'] }, (input) => + input.square.reduce((a, b) => a + b, 0) + ); + + const squareStep = flow.getStepDefinition('square'); + // Handler should be typed to receive individual items + expectTypeOf(squareStep.handler).toBeFunction(); + + const sumStep = flow.getStepDefinition('sum'); + expectTypeOf(sumStep.handler).parameters.toMatchTypeOf<[{ + run: number[]; + square: number[]; + }]>(); + }); + }); + + describe('edge cases', () => { + it('should handle empty arrays', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'process' }, (item) => ({ processed: item })); + + // Should be able to handle empty array input + type ProcessOutput = typeof flow extends Flow + ? Steps['process'] + : never; + expectTypeOf().toEqualTypeOf<{ processed: Json }[]>(); + }); + + it('should handle union types in arrays', () => { + const flow = new Flow<(string | number)[]>({ slug: 'test' }) + .map({ slug: 'stringify' }, (item) => { + expectTypeOf(item).toEqualTypeOf(); + return String(item); + }); + + type StringifyOutput = typeof flow extends Flow + ? Steps['stringify'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + + it('should handle nullable array elements', () => { + const flow = new Flow<(string | null)[]>({ slug: 'test' }) + .map({ slug: 'filter' }, (item) => { + expectTypeOf(item).toEqualTypeOf(); + return item !== null; + }); + + type FilterOutput = typeof flow extends Flow + ? Steps['filter'] + : never; + expectTypeOf().toEqualTypeOf(); + }); + }); +}); \ No newline at end of file diff --git a/pkgs/dsl/src/compile-flow.ts b/pkgs/dsl/src/compile-flow.ts index 264d36ee3..bbf169fe1 100644 --- a/pkgs/dsl/src/compile-flow.ts +++ b/pkgs/dsl/src/compile-flow.ts @@ -26,8 +26,14 @@ export function compileFlow(flow: AnyFlow): string[] { depsClause = `, ARRAY[${depsArray}]`; } + // Add step_type parameter for map steps + let stepTypeClause = ''; + if (step.stepType === 'map') { + stepTypeClause = `, step_type => 'map'`; + } + statements.push( - `SELECT pgflow.add_step('${flow.slug}', '${step.slug}'${depsClause}${stepOptions});` + `SELECT pgflow.add_step('${flow.slug}', '${step.slug}'${depsClause}${stepOptions}${stepTypeClause});` ); } diff --git a/pkgs/dsl/src/dsl.ts b/pkgs/dsl/src/dsl.ts index 7d46c3db0..e7964f6f9 100644 --- a/pkgs/dsl/src/dsl.ts +++ b/pkgs/dsl/src/dsl.ts @@ -292,6 +292,7 @@ export interface StepDefinition< handler: (input: TInput, context: TContext) => TOutput | Promise; dependencies: string[]; options: StepRuntimeOptions; + stepType?: 'single' | 'map'; } // Utility type to merge two object types and preserve required properties @@ -385,7 +386,7 @@ export class Flow< ) => any, Deps extends Extract = never >( - opts: Simplify<{ slug: Slug; dependsOn?: Deps[] } & StepRuntimeOptions>, + opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn?: Deps[] } & StepRuntimeOptions>, handler: THandler ): Flow< TFlowInput, @@ -469,10 +470,10 @@ export class Flow< /** * Add an array-returning step to the flow with compile-time type safety - * + * * This method provides semantic clarity and type enforcement for steps that return arrays, * while maintaining full compatibility with the existing step system by delegating to `.step()`. - * + * * @template Slug - The unique identifier for this step * @template THandler - The handler function that must return an array or Promise * @template Deps - The step dependencies (must be existing step slugs) @@ -494,7 +495,7 @@ export class Flow< ) => Array | Promise>, Deps extends Extract = never >( - opts: Simplify<{ slug: Slug; dependsOn?: Deps[] } & StepRuntimeOptions>, + opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn?: Deps[] } & StepRuntimeOptions>, handler: THandler ): Flow< TFlowInput, @@ -506,4 +507,102 @@ export class Flow< // Delegate to existing .step() method for maximum code reuse return this.step(opts, handler); } + + /** + * Add a map step to the flow that processes arrays element by element + * + * Map steps apply a handler function to each element of an array, producing + * a new array with the transformed elements. The handler receives individual + * array elements, not the full input object. + * + * @param opts - Step configuration including slug and optional array dependency + * @param handler - Function that processes individual array elements + * @returns A new Flow instance with the map step added + */ + // Overload for root map + map( + opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug } & StepRuntimeOptions>, + handler: TFlowInput extends readonly (infer Item)[] + ? THandler & ((item: Item, context: BaseContext & TContext) => Json | Promise) + : never + ): Flow< + TFlowInput, + TContext & BaseContext, + Steps & { [K in Slug]: Awaited any)>>[] }, + StepDependencies & { [K in Slug]: [] } + >; + + // Overload for dependent map + map, THandler>( + opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; array: TArrayDep } & StepRuntimeOptions>, + handler: Steps[TArrayDep] extends readonly (infer Item)[] + ? THandler & ((item: Item, context: BaseContext & TContext) => Json | Promise) + : never + ): Flow< + TFlowInput, + TContext & BaseContext, + Steps & { [K in Slug]: Awaited any)>>[] }, + StepDependencies & { [K in Slug]: [TArrayDep] } + >; + + // Implementation + map(opts: any, handler: any): any { + const slug = opts.slug; + + // Validate the step slug + validateSlug(slug); + + if (this.stepDefinitions[slug]) { + throw new Error(`Step "${slug}" already exists in flow "${this.slug}"`); + } + + // Determine dependencies based on whether array is specified + let dependencies: string[] = []; + const arrayDep = (opts as any).array; + if (arrayDep) { + // Dependent map - validate single dependency exists and returns array + if (!this.stepDefinitions[arrayDep]) { + throw new Error(`Step "${slug}" depends on undefined step "${arrayDep}"`); + } + dependencies = [arrayDep]; + } else { + // Root map - flow input must be an array (type system enforces this) + dependencies = []; + } + + // Extract runtime options + const options: StepRuntimeOptions = {}; + if (opts.maxAttempts !== undefined) options.maxAttempts = opts.maxAttempts; + if (opts.baseDelay !== undefined) options.baseDelay = opts.baseDelay; + if (opts.timeout !== undefined) options.timeout = opts.timeout; + if (opts.startDelay !== undefined) options.startDelay = opts.startDelay; + + // Validate runtime options + validateRuntimeOptions(options, { optional: true }); + + // Create the map step definition with stepType + // Note: We use AnyInput/AnyOutput here because the actual types are handled at the type level via overloads + const newStepDefinition: StepDefinition = { + slug, + handler: handler as any, // Type assertion needed due to complex generic constraints + dependencies, + options, + stepType: 'map', // Mark this as a map step + }; + + const newStepDefinitions = { + ...this.stepDefinitions, + [slug]: newStepDefinition, + }; + + // Create a new stepOrder array with the new slug appended + const newStepOrder = [...this.stepOrder, slug]; + + // Create and return new Flow instance with updated types + return new Flow( + { slug: this.slug, ...this.options }, + newStepDefinitions as Record>, + newStepOrder + ) as any; // Type assertion handled by overloads + } } diff --git a/pkgs/example-flows/src/map-flow.ts b/pkgs/example-flows/src/map-flow.ts new file mode 100644 index 000000000..f18cd1935 --- /dev/null +++ b/pkgs/example-flows/src/map-flow.ts @@ -0,0 +1,185 @@ +import { Flow } from '@pgflow/dsl'; + +/** + * Example flow demonstrating the map step functionality + * + * This flow shows: + * 1. Root map - processing array input directly + * 2. Dependent map - mapping over another step's output + * 3. Map chaining - one map depending on another + * 4. Integration with regular steps + */ + +// Example 1: Simple root map processing string array +export const TextProcessingFlow = new Flow({ + slug: 'text_processing', + maxAttempts: 3, +}) + // Root map - processes each string in the input array + .map({ slug: 'normalize' }, (text) => { + // Each handler receives a single string, not the array + return text.trim().toLowerCase(); + }) + // Dependent map - processes the normalized strings + .map({ slug: 'capitalize', array: 'normalize' }, (text) => { + return text.charAt(0).toUpperCase() + text.slice(1); + }) + // Regular step that aggregates the results + .step({ slug: 'summarize', dependsOn: ['capitalize'] }, (input) => ({ + processed: input.capitalize.length, + results: input.capitalize, + })); + +export const UserEnrichmentFlow = new Flow<{ userIds: string[] }>({ + slug: 'user_enrichment', +}) + // Generate initial data array + .array({ slug: 'fetch_users' }, async ({ run }) => { + // Simulating API calls to fetch user data + return run.userIds.map((id) => ({ + id, + name: `User_${id}`, + })); + }) + // Map over each user to add timestamps + .map({ slug: 'add_timestamps', array: 'fetch_users' }, (user) => ({ + ...user, + createdAt: new Date().toISOString(), + processed: true, + })) + // Map to calculate derived fields + .map({ slug: 'add_metadata', array: 'add_timestamps' }, (user) => ({ + ...user, + displayName: `${user.name} (${user.id})`, + hashId: Buffer.from(user.id).toString('base64'), + })) + // Final aggregation + .step({ slug: 'create_report', dependsOn: ['add_metadata'] }, (input) => ({ + totalUsers: input.add_metadata.length, + processedAt: new Date().toISOString(), + users: input.add_metadata, + })); + +// Example 3: Numerical computation with maps +export const StatisticsFlow = new Flow({ + slug: 'statistics', +}) + // Square each number + .map({ slug: 'square', maxAttempts: 5 }, (n) => n * n) + // Calculate cumulative values + .map({ slug: 'cumulative', array: 'square' }, (value, context) => { + // Maps can access context just like regular steps + console.log(`Processing value ${value} with context`, context.env); + return { + original: Math.sqrt(value), + squared: value, + cubed: value * Math.sqrt(value), + }; + }) + // Aggregate statistics + .step({ slug: 'calculate_stats', dependsOn: ['cumulative'] }, (input) => { + const values = input.cumulative; + const squares = values.map((v) => v.squared); + return { + count: values.length, + sumOfSquares: squares.reduce((a, b) => a + b, 0), + average: squares.reduce((a, b) => a + b, 0) / values.length, + max: Math.max(...squares), + min: Math.min(...squares), + }; + }); + +// Example 4: Complex nested processing +type OrderItem = { + productId: string; + quantity: number; + price: number; +}; + +export const OrderProcessingFlow = new Flow({ + slug: 'order_processing', + timeout: 120, +}) + // Validate each item + .map({ slug: 'validate_items' }, (item) => { + const isValid = item.quantity > 0 && item.price > 0; + return { + ...item, + valid: isValid, + subtotal: item.quantity * item.price, + }; + }) + // Apply discounts + .map( + { + slug: 'apply_discounts', + array: 'validate_items', + baseDelay: 1000, + }, + (item) => { + // The item from validate_items has additional fields, but TypeScript can't track that + // We know it has the shape: { productId, quantity, price, valid, subtotal } + const itemWithSubtotal = item as typeof item & { subtotal: number; valid: boolean }; + const discount = itemWithSubtotal.quantity >= 10 ? 0.1 : 0; + const discountAmount = itemWithSubtotal.subtotal * discount; + return { + ...itemWithSubtotal, + discount, + discountAmount, + finalPrice: itemWithSubtotal.subtotal - discountAmount, + }; + } + ) + // Calculate totals + .step( + { slug: 'calculate_order', dependsOn: ['apply_discounts'] }, + (input) => { + const items = input.apply_discounts; + const validItems = items.filter((item) => item.valid); + + return { + orderTotal: validItems.reduce((sum, item) => sum + item.finalPrice, 0), + totalDiscount: validItems.reduce( + (sum, item) => sum + item.discountAmount, + 0 + ), + itemCount: validItems.length, + invalidItemCount: items.length - validItems.length, + items: validItems, + }; + } + ); + +// Example 5: Parallel map chains +export const ParallelMapsFlow = new Flow<{ + numbers: number[]; + strings: string[]; +}>({ + slug: 'parallel_maps', +}) + // Extract arrays for parallel processing + .step({ slug: 'extract_numbers' }, ({ run }) => run.numbers) + .step({ slug: 'extract_strings' }, ({ run }) => run.strings) + // Process numbers + .map({ slug: 'double_numbers', array: 'extract_numbers' }, (n) => n * 2) + .map({ slug: 'square_numbers', array: 'double_numbers' }, (n) => n * n) + // Process strings + .map({ slug: 'uppercase_strings', array: 'extract_strings' }, (s) => + s.toUpperCase() + ) + .map({ slug: 'reverse_strings', array: 'uppercase_strings' }, (s) => + s.split('').reverse().join('') + ) + // Combine results + .step( + { + slug: 'combine_results', + dependsOn: ['square_numbers', 'reverse_strings'], + }, + (input) => ({ + processedNumbers: input.square_numbers, + processedStrings: input.reverse_strings, + numberSum: input.square_numbers.reduce((a, b) => a + b, 0), + concatenated: input.reverse_strings.join(', '), + }) + );