@@ -16,10 +16,13 @@ import (
16
16
kafkalib "github.com/xataio/pgstream/pkg/kafka"
17
17
loglib "github.com/xataio/pgstream/pkg/log"
18
18
schemalogpg "github.com/xataio/pgstream/pkg/schemalog/postgres"
19
+ pgsnapshotgenerator "github.com/xataio/pgstream/pkg/snapshot/generator/postgres/data"
20
+ "github.com/xataio/pgstream/pkg/snapshot/generator/postgres/schema/pgdumprestore"
19
21
"github.com/xataio/pgstream/pkg/stream"
20
22
"github.com/xataio/pgstream/pkg/tls"
21
23
"github.com/xataio/pgstream/pkg/wal"
22
24
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
25
+ pglistener "github.com/xataio/pgstream/pkg/wal/listener/postgres"
23
26
"github.com/xataio/pgstream/pkg/wal/processor/batch"
24
27
"github.com/xataio/pgstream/pkg/wal/processor/injector"
25
28
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
@@ -84,8 +87,17 @@ func runStream(t *testing.T, ctx context.Context, cfg *stream.Config) {
84
87
}()
85
88
}
86
89
90
+ func initStream (t * testing.T , ctx context.Context , url string ) {
91
+ err := stream .Init (ctx , url , "" )
92
+ require .NoError (t , err )
93
+ }
94
+
87
95
func execQuery (t * testing.T , ctx context.Context , query string ) {
88
- conn , err := pglib .NewConn (ctx , pgurl )
96
+ execQueryWithURL (t , ctx , pgurl , query )
97
+ }
98
+
99
+ func execQueryWithURL (t * testing.T , ctx context.Context , url , query string ) {
100
+ conn , err := pglib .NewConn (ctx , url )
89
101
require .NoError (t , err )
90
102
91
103
_ , err = conn .Exec (ctx , query )
@@ -108,6 +120,28 @@ func testPostgresListenerCfg() stream.ListenerConfig {
108
120
}
109
121
}
110
122
123
+ func testPostgresListenerCfgWithSnapshot (sourceURL , targetURL string , tables []string ) stream.ListenerConfig {
124
+ return stream.ListenerConfig {
125
+ Postgres : & stream.PostgresListenerConfig {
126
+ Replication : pgreplication.Config {
127
+ PostgresURL : sourceURL ,
128
+ },
129
+ Snapshot : & pglistener.SnapshotConfig {
130
+ Generator : pgsnapshotgenerator.Config {
131
+ URL : sourceURL ,
132
+ },
133
+ Tables : tables ,
134
+ Schema : pglistener.SchemaSnapshotConfig {
135
+ DumpRestore : & pgdumprestore.Config {
136
+ SourcePGURL : sourceURL ,
137
+ TargetPGURL : targetURL ,
138
+ },
139
+ },
140
+ },
141
+ },
142
+ }
143
+ }
144
+
111
145
func testKafkaListenerCfg () stream.ListenerConfig {
112
146
return stream.ListenerConfig {
113
147
Kafka : & stream.KafkaListenerConfig {
@@ -164,7 +198,7 @@ func testWebhookProcessorCfg() stream.ProcessorConfig {
164
198
}
165
199
}
166
200
167
- func testPostgresProcessorCfg () stream.ProcessorConfig {
201
+ func testPostgresProcessorCfg (sourcePGURL string ) stream.ProcessorConfig {
168
202
return stream.ProcessorConfig {
169
203
Postgres : & stream.PostgresProcessorConfig {
170
204
BatchWriter : postgres.Config {
@@ -173,13 +207,13 @@ func testPostgresProcessorCfg() stream.ProcessorConfig {
173
207
BatchTimeout : 50 * time .Millisecond ,
174
208
},
175
209
SchemaStore : schemalogpg.Config {
176
- URL : pgurl ,
210
+ URL : sourcePGURL ,
177
211
},
178
212
},
179
213
},
180
214
Injector : & injector.Config {
181
215
Store : schemalogpg.Config {
182
- URL : pgurl ,
216
+ URL : sourcePGURL ,
183
217
},
184
218
},
185
219
}
0 commit comments