Skip to content

Commit 2b27c54

Browse files
committed
test: add comprehensive tests for map step compilation, type constraints, and edge cases
- Introduced new integration tests for flow compilation involving map steps - Added runtime validation tests for step dependencies and slug validation - Included type inference validation tests for map method constraints - Covered edge cases such as empty arrays and various runtime options - Added tests for flow with only map steps and multiple independent chains - Ensured correct parameter ordering and handling of dependencies in compiled SQL - Expanded coverage for root and dependent map compilation scenarios - Included tests for flow with only map steps and chaining behaviors - Improved test suite robustness for map step handling in flow compilation
1 parent fcb8222 commit 2b27c54

File tree

11 files changed

+1177
-16
lines changed

11 files changed

+1177
-16
lines changed

.changeset/easy-bats-nail.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
---
2+
'@pgflow/example-flows': patch
3+
'@pgflow/dsl': patch
4+
---
5+
6+
Add `.map()` method to Flow DSL for defining map-type steps
7+
8+
The new `.map()` method enables defining steps that process arrays element-by-element, complementing the existing SQL Core map infrastructure. Key features:
9+
10+
- **Root maps**: Process flow input arrays directly by omitting the `array` property
11+
- **Dependent maps**: Process another step's array output using `array: 'stepSlug'`
12+
- **Type-safe**: Enforces Json-compatible types with full TypeScript inference
13+
- **Different handler signature**: Receives individual items `(item, context)` instead of full input object
14+
- **Always returns arrays**: Return type is `HandlerReturnType[]`
15+
- **SQL generation**: Correctly adds `step_type => 'map'` parameter to `pgflow.add_step()`
16+
17+
Example usage:
18+
```typescript
19+
// Root map - processes array input
20+
new Flow<string[]>({ slug: 'process' })
21+
.map({ slug: 'uppercase' }, (item) => item.toUpperCase())
22+
23+
// Dependent map - processes another step's output
24+
new Flow<{}>({ slug: 'workflow' })
25+
.array({ slug: 'items' }, () => [1, 2, 3])
26+
.map({ slug: 'double', array: 'items' }, (n) => n * 2)

.claude/settings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
"Bash(grep:*)",
2424
"Bash(ls:*)",
2525
"Bash(mkdir:*)",
26+
"Bash(npx tsc:*)",
27+
"Bash(pnpm tsc:*)",
2628
"Bash(pnpm eslint:*)",
2729
"Bash(pnpm nx build:*)",
2830
"Bash(pnpm nx lint:*)",

PLAN.md

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
-**DONE**: Dependency count propagation for map steps
1010
-**DONE**: Array element extraction - tasks receive individual array elements
1111
-**DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents
12-
- **NEXT**: DSL support for `.map()` for defining map steps
12+
- **DONE**: DSL support for `.map()` for defining map steps
1313

1414
### Chores
1515

@@ -83,22 +83,24 @@
8383
- Validates non-array outputs to map steps fail correctly
8484
- Fixed broadcast aggregation to send full array not individual task output
8585

86-
#### ❌ Remaining Work
87-
88-
- [ ] **DSL Support for .map() Step Type**
86+
- [x] **PR #218: DSL Support for .map() Step Type** - `09-18-add-map-support-to-dsl` (THIS PR)
8987

90-
- Add `.map()` method to Flow DSL for defining map steps
88+
- Added `.map()` method to Flow DSL for defining map steps
9189
- Constraints:
9290
- Locked to exactly one dependency (enforced at compile time)
9391
- Dependency must return an array (type-checked)
9492
- Syntax design:
9593
- Dependent maps: `flow.map({ slug: 'stepName', array: 'arrayReturningStep' }, handler)`
96-
- Root maps: Decide between `{ array: 'run' }` or omitting array property
94+
- Root maps: Omit array property
9795
- Return type always inferred as array
9896
- Comprehensive tests:
9997
- Runtime validation of array dependencies
10098
- Type safety for input/output types
10199
- Compile-time enforcement of single dependency rule
100+
- Fixed complex TypeScript type inference issue with overloads
101+
- Updated DSL README with .map() documentation
102+
103+
#### ❌ Remaining Work
102104

103105
- [ ] **Fix Orphaned Messages on Run Failure**
104106

@@ -137,7 +139,7 @@
137139
- Ensure clean migration path from current production schema
138140
- If NULL improvement is done, include it in the consolidated migration
139141

140-
- [ ] **Update README's** and **Docs**
142+
- [ ] **Update core README**
141143

142144
- `pkgs/core/README.md`
143145

@@ -149,12 +151,7 @@
149151
- Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input
150152
- Explain cascade completion of taskless steps and its limitations
151153

152-
- `pkgs/dsl/README.md`
153-
154-
- Briefly describe the new `.array()` and `.map()` methods
155-
- Mention `.array` is mostly sugar, and `.map` is the new step type
156-
- Link to `pkgs/core/README.md` sections for more explanation about map steps
157-
- Make sure to mention that maps are constrained to have exactly one dependency
154+
- [ ] **Add docs page**
158155

159156
- **Add basic docs page**
160157

pkgs/dsl/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,78 @@ This design ensures:
7373
- Data doesn't need to be manually forwarded through intermediate steps
7474
- Steps can combine original input with processed data from previous steps
7575

76+
### Step Methods
77+
78+
The Flow DSL provides three methods for defining steps in your workflow:
79+
80+
#### `.step()` - Regular Steps
81+
82+
The standard method for adding steps to a flow. Each step processes input and returns output.
83+
84+
```typescript
85+
.step(
86+
{ slug: 'process', dependsOn: ['previous'] },
87+
async (input) => {
88+
// Access input.run and input.previous
89+
return { result: 'processed' };
90+
}
91+
)
92+
```
93+
94+
#### `.array()` - Array-Returning Steps
95+
96+
A semantic wrapper around `.step()` that provides type enforcement for steps that return arrays. Useful for data fetching or collection steps.
97+
98+
```typescript
99+
.array(
100+
{ slug: 'fetch_items' },
101+
async () => [1, 2, 3, 4, 5]
102+
)
103+
```
104+
105+
#### `.map()` - Array Processing Steps
106+
107+
Processes arrays element-by-element, similar to JavaScript's `Array.map()`. The handler receives individual items instead of the full input object.
108+
109+
```typescript
110+
// Root map - processes flow input array
111+
new Flow<string[]>({ slug: 'process_strings' })
112+
.map({ slug: 'uppercase' }, (item) => item.toUpperCase());
113+
114+
// Dependent map - processes another step's output
115+
new Flow<{}>({ slug: 'data_pipeline' })
116+
.array({ slug: 'numbers' }, () => [1, 2, 3])
117+
.map({ slug: 'double', array: 'numbers' }, (n) => n * 2)
118+
.map({ slug: 'square', array: 'double' }, (n) => n * n);
119+
```
120+
121+
**Key differences from regular steps:**
122+
- Uses `array:` instead of `dependsOn:` for specifying the single array dependency
123+
- Handler signature is `(item, context) => result` instead of `(input, context) => result`
124+
- Return type is always an array
125+
- Generates SQL with `step_type => 'map'` parameter for pgflow's map processing
126+
127+
**Type Safety:**
128+
The `.map()` method provides full TypeScript type inference for array elements:
129+
130+
```typescript
131+
type User = { id: number; name: string };
132+
133+
new Flow<{}>({ slug: 'user_flow' })
134+
.array({ slug: 'users' }, (): User[] => [
135+
{ id: 1, name: 'Alice' },
136+
{ id: 2, name: 'Bob' }
137+
])
138+
.map({ slug: 'greet', array: 'users' }, (user) => {
139+
// TypeScript knows user is of type User
140+
return `Hello, ${user.name} (ID: ${user.id})`;
141+
});
142+
```
143+
144+
**Limitations:**
145+
- Can only depend on a single array-returning step
146+
- TypeScript may not track type transformations between chained maps (use type assertions if needed)
147+
76148
### Context Object
77149

78150
Step handlers can optionally receive a second parameter - the **context object** - which provides access to platform resources and runtime information.
@@ -114,6 +186,11 @@ All platforms provide these core resources:
114186
msg_id: number;
115187
}
116188
```
189+
- **`context.workerConfig`** - Resolved worker configuration with all defaults applied
190+
```typescript
191+
// Provides access to worker settings like retry limits
192+
const isLastAttempt = context.rawMessage.read_ct >= context.workerConfig.retry.limit;
193+
```
117194

118195
#### Supabase Platform Resources
119196

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
import { describe, it, expect } from 'vitest';
2+
import { Flow } from '../../src/dsl.js';
3+
import { compileFlow } from '../../src/compile-flow.js';
4+
5+
describe('Map flow integration tests', () => {
6+
describe('complete flow examples', () => {
7+
it('should compile a data processing pipeline with maps', () => {
8+
// Simulating a real-world data processing flow
9+
const flow = new Flow<string[]>({ slug: 'data_processing' })
10+
.map({ slug: 'normalize' }, (item) => item.trim().toLowerCase())
11+
.map({ slug: 'validate', array: 'normalize' }, (item) => {
12+
// Validate each normalized item
13+
return item.length > 0 && item.length < 100;
14+
})
15+
.step({ slug: 'summarize', dependsOn: ['validate'] }, (input) => ({
16+
total: input.validate.length,
17+
valid: input.validate.filter(v => v).length,
18+
invalid: input.validate.filter(v => !v).length
19+
}));
20+
21+
const sql = compileFlow(flow);
22+
23+
expect(sql).toHaveLength(4);
24+
expect(sql[0]).toBe("SELECT pgflow.create_flow('data_processing');");
25+
expect(sql[1]).toBe("SELECT pgflow.add_step('data_processing', 'normalize', step_type => 'map');");
26+
expect(sql[2]).toBe("SELECT pgflow.add_step('data_processing', 'validate', ARRAY['normalize'], step_type => 'map');");
27+
expect(sql[3]).toBe("SELECT pgflow.add_step('data_processing', 'summarize', ARRAY['validate']);");
28+
});
29+
30+
it('should compile an ETL flow with array generation and mapping', () => {
31+
const flow = new Flow<{ sourceIds: string[] }>({ slug: 'etl_flow' })
32+
.array({ slug: 'fetch_data' }, async ({ run }) => {
33+
// Simulating fetching data for each source ID
34+
return run.sourceIds.map(id => ({ id, data: `data_${id}` }));
35+
})
36+
.map({ slug: 'transform', array: 'fetch_data' }, (record) => ({
37+
...record,
38+
transformed: record.data.toUpperCase(),
39+
timestamp: Date.now()
40+
}))
41+
.map({ slug: 'enrich', array: 'transform' }, async (record) => ({
42+
...record,
43+
enriched: true,
44+
metadata: { processedAt: new Date().toISOString() }
45+
}))
46+
.step({ slug: 'load', dependsOn: ['enrich'] }, async (input) => {
47+
// Final loading step
48+
return {
49+
recordsProcessed: input.enrich.length,
50+
success: true
51+
};
52+
});
53+
54+
const sql = compileFlow(flow);
55+
56+
expect(sql).toHaveLength(5);
57+
expect(sql[1]).not.toContain("step_type"); // array step
58+
expect(sql[2]).toContain("step_type => 'map'");
59+
expect(sql[3]).toContain("step_type => 'map'");
60+
expect(sql[4]).not.toContain("step_type"); // regular step
61+
});
62+
63+
it('should handle complex nested array processing', () => {
64+
// Flow that processes nested arrays (e.g., matrix operations)
65+
const flow = new Flow<number[][]>({ slug: 'matrix_flow' })
66+
.map({ slug: 'row_sums' }, (row) => row.reduce((a, b) => a + b, 0))
67+
.step({ slug: 'total_sum', dependsOn: ['row_sums'] }, (input) =>
68+
input.row_sums.reduce((a, b) => a + b, 0)
69+
);
70+
71+
const sql = compileFlow(flow);
72+
73+
expect(sql).toHaveLength(3);
74+
expect(sql[1]).toBe("SELECT pgflow.add_step('matrix_flow', 'row_sums', step_type => 'map');");
75+
expect(sql[2]).toBe("SELECT pgflow.add_step('matrix_flow', 'total_sum', ARRAY['row_sums']);");
76+
});
77+
});
78+
79+
describe('runtime validation', () => {
80+
it('should throw when trying to use non-existent step as array dependency', () => {
81+
const flow = new Flow<{}>({ slug: 'test' })
82+
.step({ slug: 'exists' }, () => [1, 2, 3]);
83+
84+
expect(() => {
85+
// @ts-expect-error - TypeScript should catch this at compile time
86+
flow.map({ slug: 'fail', array: 'doesNotExist' }, (item) => item);
87+
}).toThrow('Step "fail" depends on undefined step "doesNotExist"');
88+
});
89+
90+
it('should throw when step slug already exists', () => {
91+
const flow = new Flow<number[]>({ slug: 'test' })
92+
.map({ slug: 'process' }, (n) => n * 2);
93+
94+
expect(() => {
95+
flow.map({ slug: 'process' }, (n) => n * 3);
96+
}).toThrow('Step "process" already exists in flow "test"');
97+
});
98+
99+
it('should validate slug format', () => {
100+
expect(() => {
101+
new Flow<number[]>({ slug: 'test' })
102+
.map({ slug: 'invalid-slug!' }, (n) => n);
103+
}).toThrow(); // validateSlug should reject invalid characters
104+
});
105+
106+
it('should validate runtime options', () => {
107+
// This should not throw - valid options
108+
const validFlow = new Flow<number[]>({ slug: 'test' })
109+
.map({
110+
slug: 'valid',
111+
maxAttempts: 3,
112+
baseDelay: 1000,
113+
timeout: 30000,
114+
startDelay: 5000
115+
}, (n) => n);
116+
117+
expect(compileFlow(validFlow)).toHaveLength(2);
118+
119+
// Invalid options should be caught by validateRuntimeOptions
120+
expect(() => {
121+
new Flow<number[]>({ slug: 'test' })
122+
.map({
123+
slug: 'invalid',
124+
maxAttempts: 0 // Should be >= 1
125+
}, (n) => n);
126+
}).toThrow();
127+
});
128+
});
129+
130+
describe('type inference validation', () => {
131+
it('should correctly infer types through map chains', () => {
132+
const flow = new Flow<{ items: string[] }>({ slug: 'test' })
133+
.step({ slug: 'extract', dependsOn: [] }, ({ run }) => run.items)
134+
.map({ slug: 'lengths', array: 'extract' }, (item) => item.length)
135+
.map({ slug: 'doubles', array: 'lengths' }, (len) => len * 2)
136+
.step({ slug: 'sum', dependsOn: ['doubles'] }, (input) => {
137+
// Type checking - this should compile without errors
138+
const total: number = input.doubles.reduce((a, b) => a + b, 0);
139+
return total;
140+
});
141+
142+
const sql = compileFlow(flow);
143+
expect(sql).toHaveLength(5);
144+
});
145+
});
146+
147+
describe('edge cases', () => {
148+
it('should handle empty array processing', () => {
149+
const flow = new Flow<Json[]>({ slug: 'empty_test' })
150+
.map({ slug: 'process' }, (item) => ({ processed: item }));
151+
152+
const sql = compileFlow(flow);
153+
expect(sql).toHaveLength(2);
154+
expect(sql[1]).toContain("step_type => 'map'");
155+
});
156+
157+
it('should handle all runtime options combinations', () => {
158+
const flow = new Flow<string[]>({ slug: 'options_test' })
159+
.map({ slug: 'no_options' }, (s) => s)
160+
.map({ slug: 'some_options', array: 'no_options', maxAttempts: 5 }, (s) => s)
161+
.map({
162+
slug: 'all_options',
163+
array: 'some_options',
164+
maxAttempts: 3,
165+
baseDelay: 1000,
166+
timeout: 30000,
167+
startDelay: 5000
168+
}, (s) => s);
169+
170+
const sql = compileFlow(flow);
171+
172+
expect(sql[1]).toBe("SELECT pgflow.add_step('options_test', 'no_options', step_type => 'map');");
173+
expect(sql[2]).toBe("SELECT pgflow.add_step('options_test', 'some_options', ARRAY['no_options'], max_attempts => 5, step_type => 'map');");
174+
expect(sql[3]).toContain("max_attempts => 3");
175+
expect(sql[3]).toContain("base_delay => 1000");
176+
expect(sql[3]).toContain("timeout => 30000");
177+
expect(sql[3]).toContain("start_delay => 5000");
178+
expect(sql[3]).toContain("step_type => 'map'");
179+
});
180+
181+
it('should handle map steps with no further dependencies', () => {
182+
// Map step as a leaf node
183+
const flow = new Flow<number[]>({ slug: 'leaf_map' })
184+
.map({ slug: 'final_map' }, (n) => n * n);
185+
186+
const sql = compileFlow(flow);
187+
expect(sql).toHaveLength(2);
188+
expect(sql[1]).toBe("SELECT pgflow.add_step('leaf_map', 'final_map', step_type => 'map');");
189+
});
190+
191+
it('should handle multiple independent map chains', () => {
192+
const flow = new Flow<{ a: number[]; b: string[] }>({ slug: 'parallel' })
193+
.step({ slug: 'extract_a' }, ({ run }) => run.a)
194+
.step({ slug: 'extract_b' }, ({ run }) => run.b)
195+
.map({ slug: 'process_a', array: 'extract_a' }, (n) => n * 2)
196+
.map({ slug: 'process_b', array: 'extract_b' }, (s) => s.toUpperCase())
197+
.step({ slug: 'combine', dependsOn: ['process_a', 'process_b'] }, (input) => ({
198+
numbers: input.process_a,
199+
strings: input.process_b
200+
}));
201+
202+
const sql = compileFlow(flow);
203+
expect(sql).toHaveLength(6);
204+
expect(sql[3]).toContain("step_type => 'map'");
205+
expect(sql[4]).toContain("step_type => 'map'");
206+
});
207+
});
208+
});

0 commit comments

Comments
 (0)