Skip to content

Commit 43c4799

Browse files
committed
test: add comprehensive tests for map step compilation, type constraints, and edge cases
- Introduced new integration tests for flow compilation involving map steps - Added runtime validation tests for step dependencies and slug validation - Included type inference validation tests for map method constraints - Covered edge cases such as empty arrays and various runtime options - Added tests for flow with only map steps and multiple independent chains - Ensured correct parameter ordering and handling of dependencies in compiled SQL - Expanded coverage for root and dependent map compilation scenarios - Included tests for flow with only map steps and chaining behaviors - Improved test suite robustness for map step handling in flow compilation
1 parent c1c4c34 commit 43c4799

File tree

13 files changed

+1349
-36
lines changed

13 files changed

+1349
-36
lines changed

.changeset/easy-bats-nail.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
---
2+
'@pgflow/dsl': patch
3+
---
4+
5+
Add `.map()` method to Flow DSL for defining map-type steps
6+
7+
The new `.map()` method enables defining steps that process arrays element-by-element, complementing the existing SQL Core map infrastructure. Key features:
8+
9+
- **Root maps**: Process flow input arrays directly by omitting the `array` property
10+
- **Dependent maps**: Process another step's array output using `array: 'stepSlug'`
11+
- **Type-safe**: Enforces Json-compatible types with full TypeScript inference
12+
- **Compile-time duplicate slug detection**: TypeScript now prevents duplicate step slugs at compile-time
13+
- **Different handler signature**: Receives individual items `(item, context)` instead of full input object
14+
- **Always returns arrays**: Return type is `HandlerReturnType[]`
15+
- **SQL generation**: Correctly adds `step_type => 'map'` parameter to `pgflow.add_step()`
16+
17+
Example usage:
18+
19+
```typescript
20+
// Root map - processes array input
21+
new Flow<string[]>({ slug: 'process' }).map({ slug: 'uppercase' }, (item) =>
22+
item.toUpperCase()
23+
);
24+
25+
// Dependent map - processes another step's output
26+
new Flow<{}>({ slug: 'workflow' })
27+
.array({ slug: 'items' }, () => [1, 2, 3])
28+
.map({ slug: 'double', array: 'items' }, (n) => n * 2);
29+
```

.claude/settings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
"Bash(grep:*)",
2424
"Bash(ls:*)",
2525
"Bash(mkdir:*)",
26+
"Bash(npx tsc:*)",
27+
"Bash(pnpm tsc:*)",
2628
"Bash(pnpm eslint:*)",
2729
"Bash(pnpm nx build:*)",
2830
"Bash(pnpm nx lint:*)",

PLAN.md

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99
-**DONE**: Dependency count propagation for map steps
1010
-**DONE**: Array element extraction - tasks receive individual array elements
1111
-**DONE**: Output aggregation - inline implementation aggregates map task outputs for dependents
12-
-**NEXT**: DSL support for `.map()` for defining map steps
12+
-**DONE**: DSL support for `.map()` for defining map steps with compile-time duplicate detection
13+
-**TODO**: Fix orphaned messages on run failure
14+
-**TODO**: Performance optimization with step_states.output column
1315

1416
### Chores
1517

16-
-**WAITING**: Integration tests for map steps
17-
-**WAITING**: Consolidated migration for map steps
18-
-**WAITING**: Documentation for map steps
19-
-**WAITING**: Graphite stack merge for map steps
18+
-**TODO**: Integration tests for map steps
19+
-**TODO**: Update core README
20+
-**TODO**: Add docs page for array and map steps
21+
-**TODO**: Migration consolidation
22+
-**TODO**: Graphite stack merge
2023

2124
## Implementation Status
2225

@@ -83,33 +86,38 @@
8386
- Validates non-array outputs to map steps fail correctly
8487
- Fixed broadcast aggregation to send full array not individual task output
8588

86-
#### ❌ Remaining Work
89+
- [x] **PR #218: DSL Support for .map() Step Type** - `09-18-add-map-support-to-dsl` ✅ COMPLETED
8790

88-
- [ ] **DSL Support for .map() Step Type**
89-
90-
- Add `.map()` method to Flow DSL for defining map steps
91+
- Added `.map()` method to Flow DSL for defining map steps
9192
- Constraints:
9293
- Locked to exactly one dependency (enforced at compile time)
9394
- Dependency must return an array (type-checked)
9495
- Syntax design:
9596
- Dependent maps: `flow.map({ slug: 'stepName', array: 'arrayReturningStep' }, handler)`
96-
- Root maps: Decide between `{ array: 'run' }` or omitting array property
97+
- Root maps: Omit array property
9798
- Return type always inferred as array
9899
- Comprehensive tests:
99100
- Runtime validation of array dependencies
100101
- Type safety for input/output types
101102
- Compile-time enforcement of single dependency rule
103+
- Fixed complex TypeScript type inference issue with overloads
104+
- Added compile-time duplicate slug detection across all DSL methods
105+
- Fixed all linting errors (replaced `{}` with `Record<string, never>`)
106+
- Updated DSL README with .map() documentation
107+
- Created detailed changeset
108+
109+
#### ❌ Remaining Work (Priority Order)
102110

103-
- [ ] **Fix Orphaned Messages on Run Failure**
111+
- [ ] **Priority 1: Fix Orphaned Messages on Run Failure** 🚨 CRITICAL
104112

105113
- Archive all pending messages when run fails
106114
- Handle map sibling tasks specially
107115
- Fix type constraint violations to fail immediately without retries
108116
- See detailed plan: [PLAN_orphaned_messages.md](./PLAN_orphaned_messages.md)
109-
- Critical for production: prevents queue performance degradation
117+
- **Critical for production: prevents queue performance degradation**
110118
- Tests already written (stashed) that document the problem
111119

112-
- [ ] **Performance Optimization: step_states.output Column**
120+
- [ ] **Priority 2: Performance Optimization - step_states.output Column**
113121

114122
- Migrate from inline aggregation to storing outputs in step_states
115123
- See detailed plan: [PLAN_step_output.md](./PLAN_step_output.md)
@@ -124,20 +132,13 @@
124132
- Update all aggregation tests (~17 files)
125133
- **Note**: This is an optimization that should be done after core functionality is stable
126134

127-
- [ ] **Integration Tests**
135+
- [ ] **Priority 3: Integration Tests**
128136

129137
- End-to-end workflows with real array data
130138
- Basic happy path coverage
131139
- This should be minimal and added to the Edge Worker integration test suite for now
132140

133-
- [ ] **Migration Consolidation**
134-
135-
- Remove all temporary/incremental migrations from feature branches
136-
- Generate a single consolidated migration for the entire map infrastructure
137-
- Ensure clean migration path from current production schema
138-
- If NULL improvement is done, include it in the consolidated migration
139-
140-
- [ ] **Update README's** and **Docs**
141+
- [ ] **Priority 4: Update core README**
141142

142143
- `pkgs/core/README.md`
143144

@@ -149,12 +150,7 @@
149150
- Explain root map vs dependent map and how it gets handled and what restrictions those apply on the Flow input
150151
- Explain cascade completion of taskless steps and its limitations
151152

152-
- `pkgs/dsl/README.md`
153-
154-
- Briefly describe the new `.array()` and `.map()` methods
155-
- Mention `.array` is mostly sugar, and `.map` is the new step type
156-
- Link to `pkgs/core/README.md` sections for more explanation about map steps
157-
- Make sure to mention that maps are constrained to have exactly one dependency
153+
- [ ] **Priority 5: Add docs page**
158154

159155
- **Add basic docs page**
160156

@@ -165,7 +161,14 @@
165161
- focus mostly on how to use it, instead of how it works under the hood
166162
- link to the README's for more details
167163

168-
- [ ] **Graphite Stack Merge**
164+
- [ ] **Priority 6: Migration Consolidation** (Do this last before merge!)
165+
166+
- Remove all temporary/incremental migrations from feature branches
167+
- Generate a single consolidated migration for the entire map infrastructure
168+
- Ensure clean migration path from current production schema
169+
- If NULL improvement is done, include it in the consolidated migration
170+
171+
- [ ] **Priority 7: Graphite Stack Merge**
169172

170173
- Configure Graphite merge queue for the complete PR stack
171174
- Ensure all PRs in sequence can be merged together

pkgs/dsl/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,78 @@ This design ensures:
7373
- Data doesn't need to be manually forwarded through intermediate steps
7474
- Steps can combine original input with processed data from previous steps
7575

76+
### Step Methods
77+
78+
The Flow DSL provides three methods for defining steps in your workflow:
79+
80+
#### `.step()` - Regular Steps
81+
82+
The standard method for adding steps to a flow. Each step processes input and returns output.
83+
84+
```typescript
85+
.step(
86+
{ slug: 'process', dependsOn: ['previous'] },
87+
async (input) => {
88+
// Access input.run and input.previous
89+
return { result: 'processed' };
90+
}
91+
)
92+
```
93+
94+
#### `.array()` - Array-Returning Steps
95+
96+
A semantic wrapper around `.step()` that provides type enforcement for steps that return arrays. Useful for data fetching or collection steps.
97+
98+
```typescript
99+
.array(
100+
{ slug: 'fetch_items' },
101+
async () => [1, 2, 3, 4, 5]
102+
)
103+
```
104+
105+
#### `.map()` - Array Processing Steps
106+
107+
Processes arrays element-by-element, similar to JavaScript's `Array.map()`. The handler receives individual items instead of the full input object.
108+
109+
```typescript
110+
// Root map - processes flow input array
111+
new Flow<string[]>({ slug: 'process_strings' })
112+
.map({ slug: 'uppercase' }, (item) => item.toUpperCase());
113+
114+
// Dependent map - processes another step's output
115+
new Flow<{}>({ slug: 'data_pipeline' })
116+
.array({ slug: 'numbers' }, () => [1, 2, 3])
117+
.map({ slug: 'double', array: 'numbers' }, (n) => n * 2)
118+
.map({ slug: 'square', array: 'double' }, (n) => n * n);
119+
```
120+
121+
**Key differences from regular steps:**
122+
- Uses `array:` instead of `dependsOn:` for specifying the single array dependency
123+
- Handler signature is `(item, context) => result` instead of `(input, context) => result`
124+
- Return type is always an array
125+
- Generates SQL with `step_type => 'map'` parameter for pgflow's map processing
126+
127+
**Type Safety:**
128+
The `.map()` method provides full TypeScript type inference for array elements:
129+
130+
```typescript
131+
type User = { id: number; name: string };
132+
133+
new Flow<{}>({ slug: 'user_flow' })
134+
.array({ slug: 'users' }, (): User[] => [
135+
{ id: 1, name: 'Alice' },
136+
{ id: 2, name: 'Bob' }
137+
])
138+
.map({ slug: 'greet', array: 'users' }, (user) => {
139+
// TypeScript knows user is of type User
140+
return `Hello, ${user.name} (ID: ${user.id})`;
141+
});
142+
```
143+
144+
**Limitations:**
145+
- Can only depend on a single array-returning step
146+
- TypeScript may not track type transformations between chained maps (use type assertions if needed)
147+
76148
### Context Object
77149

78150
Step handlers can optionally receive a second parameter - the **context object** - which provides access to platform resources and runtime information.
@@ -114,6 +186,11 @@ All platforms provide these core resources:
114186
msg_id: number;
115187
}
116188
```
189+
- **`context.workerConfig`** - Resolved worker configuration with all defaults applied
190+
```typescript
191+
// Provides access to worker settings like retry limits
192+
const isLastAttempt = context.rawMessage.read_ct >= context.workerConfig.retry.limit;
193+
```
117194

118195
#### Supabase Platform Resources
119196

0 commit comments

Comments
 (0)