1
+ import { describe , it , expect } from 'vitest' ;
2
+ import { Flow } from '../../src/dsl.js' ;
3
+ import { compileFlow } from '../../src/compile-flow.js' ;
4
+
5
+ describe ( 'Map flow integration tests' , ( ) => {
6
+ describe ( 'complete flow examples' , ( ) => {
7
+ it ( 'should compile a data processing pipeline with maps' , ( ) => {
8
+ // Simulating a real-world data processing flow
9
+ const flow = new Flow < string [ ] > ( { slug : 'data_processing' } )
10
+ . map ( { slug : 'normalize' } , ( item ) => item . trim ( ) . toLowerCase ( ) )
11
+ . map ( { slug : 'validate' , array : 'normalize' } , ( item ) => {
12
+ // Validate each normalized item
13
+ return item . length > 0 && item . length < 100 ;
14
+ } )
15
+ . step ( { slug : 'summarize' , dependsOn : [ 'validate' ] } , ( input ) => ( {
16
+ total : input . validate . length ,
17
+ valid : input . validate . filter ( v => v ) . length ,
18
+ invalid : input . validate . filter ( v => ! v ) . length
19
+ } ) ) ;
20
+
21
+ const sql = compileFlow ( flow ) ;
22
+
23
+ expect ( sql ) . toHaveLength ( 4 ) ;
24
+ expect ( sql [ 0 ] ) . toBe ( "SELECT pgflow.create_flow('data_processing');" ) ;
25
+ expect ( sql [ 1 ] ) . toBe ( "SELECT pgflow.add_step('data_processing', 'normalize', step_type => 'map');" ) ;
26
+ expect ( sql [ 2 ] ) . toBe ( "SELECT pgflow.add_step('data_processing', 'validate', ARRAY['normalize'], step_type => 'map');" ) ;
27
+ expect ( sql [ 3 ] ) . toBe ( "SELECT pgflow.add_step('data_processing', 'summarize', ARRAY['validate']);" ) ;
28
+ } ) ;
29
+
30
+ it ( 'should compile an ETL flow with array generation and mapping' , ( ) => {
31
+ const flow = new Flow < { sourceIds : string [ ] } > ( { slug : 'etl_flow' } )
32
+ . array ( { slug : 'fetch_data' } , async ( { run } ) => {
33
+ // Simulating fetching data for each source ID
34
+ return run . sourceIds . map ( id => ( { id, data : `data_${ id } ` } ) ) ;
35
+ } )
36
+ . map ( { slug : 'transform' , array : 'fetch_data' } , ( record ) => ( {
37
+ ...record ,
38
+ transformed : record . data . toUpperCase ( ) ,
39
+ timestamp : Date . now ( )
40
+ } ) )
41
+ . map ( { slug : 'enrich' , array : 'transform' } , async ( record ) => ( {
42
+ ...record ,
43
+ enriched : true ,
44
+ metadata : { processedAt : new Date ( ) . toISOString ( ) }
45
+ } ) )
46
+ . step ( { slug : 'load' , dependsOn : [ 'enrich' ] } , async ( input ) => {
47
+ // Final loading step
48
+ return {
49
+ recordsProcessed : input . enrich . length ,
50
+ success : true
51
+ } ;
52
+ } ) ;
53
+
54
+ const sql = compileFlow ( flow ) ;
55
+
56
+ expect ( sql ) . toHaveLength ( 5 ) ;
57
+ expect ( sql [ 1 ] ) . not . toContain ( "step_type" ) ; // array step
58
+ expect ( sql [ 2 ] ) . toContain ( "step_type => 'map'" ) ;
59
+ expect ( sql [ 3 ] ) . toContain ( "step_type => 'map'" ) ;
60
+ expect ( sql [ 4 ] ) . not . toContain ( "step_type" ) ; // regular step
61
+ } ) ;
62
+
63
+ it ( 'should handle complex nested array processing' , ( ) => {
64
+ // Flow that processes nested arrays (e.g., matrix operations)
65
+ const flow = new Flow < number [ ] [ ] > ( { slug : 'matrix_flow' } )
66
+ . map ( { slug : 'row_sums' } , ( row ) => row . reduce ( ( a , b ) => a + b , 0 ) )
67
+ . step ( { slug : 'total_sum' , dependsOn : [ 'row_sums' ] } , ( input ) =>
68
+ input . row_sums . reduce ( ( a , b ) => a + b , 0 )
69
+ ) ;
70
+
71
+ const sql = compileFlow ( flow ) ;
72
+
73
+ expect ( sql ) . toHaveLength ( 3 ) ;
74
+ expect ( sql [ 1 ] ) . toBe ( "SELECT pgflow.add_step('matrix_flow', 'row_sums', step_type => 'map');" ) ;
75
+ expect ( sql [ 2 ] ) . toBe ( "SELECT pgflow.add_step('matrix_flow', 'total_sum', ARRAY['row_sums']);" ) ;
76
+ } ) ;
77
+ } ) ;
78
+
79
+ describe ( 'runtime validation' , ( ) => {
80
+ it ( 'should throw when trying to use non-existent step as array dependency' , ( ) => {
81
+ const flow = new Flow < { } > ( { slug : 'test' } )
82
+ . step ( { slug : 'exists' } , ( ) => [ 1 , 2 , 3 ] ) ;
83
+
84
+ expect ( ( ) => {
85
+ // @ts -expect-error - TypeScript should catch this at compile time
86
+ flow . map ( { slug : 'fail' , array : 'doesNotExist' } , ( item ) => item ) ;
87
+ } ) . toThrow ( 'Step "fail" depends on undefined step "doesNotExist"' ) ;
88
+ } ) ;
89
+
90
+ it ( 'should throw when step slug already exists' , ( ) => {
91
+ const flow = new Flow < number [ ] > ( { slug : 'test' } )
92
+ . map ( { slug : 'process' } , ( n ) => n * 2 ) ;
93
+
94
+ expect ( ( ) => {
95
+ flow . map ( { slug : 'process' } , ( n ) => n * 3 ) ;
96
+ } ) . toThrow ( 'Step "process" already exists in flow "test"' ) ;
97
+ } ) ;
98
+
99
+ it ( 'should validate slug format' , ( ) => {
100
+ expect ( ( ) => {
101
+ new Flow < number [ ] > ( { slug : 'test' } )
102
+ . map ( { slug : 'invalid-slug!' } , ( n ) => n ) ;
103
+ } ) . toThrow ( ) ; // validateSlug should reject invalid characters
104
+ } ) ;
105
+
106
+ it ( 'should validate runtime options' , ( ) => {
107
+ // This should not throw - valid options
108
+ const validFlow = new Flow < number [ ] > ( { slug : 'test' } )
109
+ . map ( {
110
+ slug : 'valid' ,
111
+ maxAttempts : 3 ,
112
+ baseDelay : 1000 ,
113
+ timeout : 30000 ,
114
+ startDelay : 5000
115
+ } , ( n ) => n ) ;
116
+
117
+ expect ( compileFlow ( validFlow ) ) . toHaveLength ( 2 ) ;
118
+
119
+ // Invalid options should be caught by validateRuntimeOptions
120
+ expect ( ( ) => {
121
+ new Flow < number [ ] > ( { slug : 'test' } )
122
+ . map ( {
123
+ slug : 'invalid' ,
124
+ maxAttempts : 0 // Should be >= 1
125
+ } , ( n ) => n ) ;
126
+ } ) . toThrow ( ) ;
127
+ } ) ;
128
+ } ) ;
129
+
130
+ describe ( 'type inference validation' , ( ) => {
131
+ it ( 'should correctly infer types through map chains' , ( ) => {
132
+ const flow = new Flow < { items : string [ ] } > ( { slug : 'test' } )
133
+ . step ( { slug : 'extract' , dependsOn : [ ] } , ( { run } ) => run . items )
134
+ . map ( { slug : 'lengths' , array : 'extract' } , ( item ) => item . length )
135
+ . map ( { slug : 'doubles' , array : 'lengths' } , ( len ) => len * 2 )
136
+ . step ( { slug : 'sum' , dependsOn : [ 'doubles' ] } , ( input ) => {
137
+ // Type checking - this should compile without errors
138
+ const total : number = input . doubles . reduce ( ( a , b ) => a + b , 0 ) ;
139
+ return total ;
140
+ } ) ;
141
+
142
+ const sql = compileFlow ( flow ) ;
143
+ expect ( sql ) . toHaveLength ( 5 ) ;
144
+ } ) ;
145
+ } ) ;
146
+
147
+ describe ( 'edge cases' , ( ) => {
148
+ it ( 'should handle empty array processing' , ( ) => {
149
+ const flow = new Flow < Json [ ] > ( { slug : 'empty_test' } )
150
+ . map ( { slug : 'process' } , ( item ) => ( { processed : item } ) ) ;
151
+
152
+ const sql = compileFlow ( flow ) ;
153
+ expect ( sql ) . toHaveLength ( 2 ) ;
154
+ expect ( sql [ 1 ] ) . toContain ( "step_type => 'map'" ) ;
155
+ } ) ;
156
+
157
+ it ( 'should handle all runtime options combinations' , ( ) => {
158
+ const flow = new Flow < string [ ] > ( { slug : 'options_test' } )
159
+ . map ( { slug : 'no_options' } , ( s ) => s )
160
+ . map ( { slug : 'some_options' , array : 'no_options' , maxAttempts : 5 } , ( s ) => s )
161
+ . map ( {
162
+ slug : 'all_options' ,
163
+ array : 'some_options' ,
164
+ maxAttempts : 3 ,
165
+ baseDelay : 1000 ,
166
+ timeout : 30000 ,
167
+ startDelay : 5000
168
+ } , ( s ) => s ) ;
169
+
170
+ const sql = compileFlow ( flow ) ;
171
+
172
+ expect ( sql [ 1 ] ) . toBe ( "SELECT pgflow.add_step('options_test', 'no_options', step_type => 'map');" ) ;
173
+ expect ( sql [ 2 ] ) . toBe ( "SELECT pgflow.add_step('options_test', 'some_options', ARRAY['no_options'], max_attempts => 5, step_type => 'map');" ) ;
174
+ expect ( sql [ 3 ] ) . toContain ( "max_attempts => 3" ) ;
175
+ expect ( sql [ 3 ] ) . toContain ( "base_delay => 1000" ) ;
176
+ expect ( sql [ 3 ] ) . toContain ( "timeout => 30000" ) ;
177
+ expect ( sql [ 3 ] ) . toContain ( "start_delay => 5000" ) ;
178
+ expect ( sql [ 3 ] ) . toContain ( "step_type => 'map'" ) ;
179
+ } ) ;
180
+
181
+ it ( 'should handle map steps with no further dependencies' , ( ) => {
182
+ // Map step as a leaf node
183
+ const flow = new Flow < number [ ] > ( { slug : 'leaf_map' } )
184
+ . map ( { slug : 'final_map' } , ( n ) => n * n ) ;
185
+
186
+ const sql = compileFlow ( flow ) ;
187
+ expect ( sql ) . toHaveLength ( 2 ) ;
188
+ expect ( sql [ 1 ] ) . toBe ( "SELECT pgflow.add_step('leaf_map', 'final_map', step_type => 'map');" ) ;
189
+ } ) ;
190
+
191
+ it ( 'should handle multiple independent map chains' , ( ) => {
192
+ const flow = new Flow < { a : number [ ] ; b : string [ ] } > ( { slug : 'parallel' } )
193
+ . step ( { slug : 'extract_a' } , ( { run } ) => run . a )
194
+ . step ( { slug : 'extract_b' } , ( { run } ) => run . b )
195
+ . map ( { slug : 'process_a' , array : 'extract_a' } , ( n ) => n * 2 )
196
+ . map ( { slug : 'process_b' , array : 'extract_b' } , ( s ) => s . toUpperCase ( ) )
197
+ . step ( { slug : 'combine' , dependsOn : [ 'process_a' , 'process_b' ] } , ( input ) => ( {
198
+ numbers : input . process_a ,
199
+ strings : input . process_b
200
+ } ) ) ;
201
+
202
+ const sql = compileFlow ( flow ) ;
203
+ expect ( sql ) . toHaveLength ( 6 ) ;
204
+ expect ( sql [ 3 ] ) . toContain ( "step_type => 'map'" ) ;
205
+ expect ( sql [ 4 ] ) . toContain ( "step_type => 'map'" ) ;
206
+ } ) ;
207
+ } ) ;
208
+ } ) ;
0 commit comments