Skip to content

Commit 9083c08

Browse files
authored
Merge pull request #157 from xataio/add-snapshot-listener
Add snapshot listener
2 parents 299d363 + 160fe33 commit 9083c08

15 files changed

+383
-40
lines changed

README.md

+68-7
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,23 @@
2020
- Fast initial snapshots
2121
- Column value transformations
2222
- Modular deployment configuration, only requires Postgres
23-
- Schema based message partitioning
24-
- Schema filtering
25-
- Elasticsearch/OpenSearch replication output plugin support
26-
- Webhook support
23+
- Multiple out of the box supported outputs
24+
- Elasticsearch/OpenSearch
25+
- Webhooks
26+
- PostgreSQL
27+
- Kafka support with schema based partitioning
28+
- Snapshot only mode (for when you don't need continuous replication)
29+
- Extendable support for custom output plugins
2730
- Automatic discovery of table primary key/unique not null columns for use as event identity
28-
- Highly customisable modules when used as library
2931
- Core metrics available via opentelemetry
30-
- Extendable support for custom replication output plugins
3132
- Continuous consumption of replication slot with configurable memory guards
3233

3334
## Table of Contents
3435

3536
- [Usage](#usage)
3637
- [Configuration](#configuration)
38+
- [Tracking Schema Changes](#tracking-schema-changes)
39+
- [Snapshots](#snapshots)
3740
- [Architecture](#architecture)
3841
- [Limitations](#limitations)
3942
- [Glossary](#glossary)
@@ -79,6 +82,25 @@ If you have an environment available, with at least Postgres and whichever modul
7982
docker-compose -f build/docker/docker-compose.yml up
8083
```
8184

85+
The docker-compose file has profiles that can be used in order to bring up only the relevant containers. If for example you only want to run PostgreSQL to PostgreSQL pgstream replication you can use the `pg2pg` profile as follows:
86+
87+
```
88+
docker-compose -f build/docker/docker-compose.yml --profile pg2pg up
89+
```
90+
91+
You can also run multiple profiles. For example to start two PostgreSQL instances and Kafka:
92+
93+
```
94+
docker-compose -f build/docker/docker-compose.yml --profile pg2pg --profile kafka up
95+
```
96+
97+
List of supported docker profiles:
98+
99+
- pg2pg
100+
- pg2os
101+
- pg2webhook
102+
- kafka
103+
82104
#### Prepare the database
83105

84106
This will create the `pgstream` schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See [Tracking schema changes](#tracking-schema-changes) section for more details. It will also create a replication slot for the configured database which will be used by the pgstream service.
@@ -118,6 +140,18 @@ pgstream run -c pg2kafka.env --log-level trace
118140
pgstream run -c kafka2os.env --log-level trace
119141
```
120142

143+
Example running pgstream with PostgreSQL -> PostgreSQL with initial snapshot enabled:
144+
145+
```
146+
pgstream run -c pg2pg.env --log-level trace
147+
```
148+
149+
Example running pgstream with PostgreSQL snapshot only mode -> PostgreSQL:
150+
151+
```
152+
pgstream run -c snapshot2pg.env --log-level trace
153+
```
154+
121155
The run command will parse the configuration provided, and initialise the configured modules. It requires at least one listener and one processor.
122156

123157
## Configuration
@@ -227,6 +261,21 @@ One of exponential/constant backoff policies can be provided for the search stor
227261

228262
</details>
229263

264+
<details>
265+
<summary>Postgres Batch Writer</summary>
266+
267+
| Environment Variable | Default | Required | Description |
268+
| -------------------------------------------- | ------- | -------- | ---------------------------------------------------------------------------------------------------------------- | --- |
269+
| PGSTREAM_POSTGRES_WRITER_TARGET_URL | N/A | Yes | URL for the PostgreSQL store to connect to |
270+
| PGSTREAM_POSTGRES_WRITER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to PostgreSQL is triggered. |
271+
| PGSTREAM_POSTGRES_WRITER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to PostgreSQL. |
272+
| PGSTREAM_POSTGRES_WRITER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the postgres batch writer for inflight batches. | |
273+
| PGSTREAM_POSTGRES_WRITER_BATCH_BYTES | 1572864 | No | Max size in bytes for a given batch. When this size is reached, the batch is sent to PostgreSQL. |
274+
| PGSTREAM_POSTGRES_WRITER_SCHEMALOG_STORE_URL | N/A | No | URL of the store where the pgstream schemalog table which keeps track of schema changes is. |
275+
| PGSTREAM_POSTGRES_WRITER_DISABLE_TRIGGERS | False | No | Option to disable triggers on the target PostgreSQL database while performing the snaphot/replication streaming. |
276+
277+
</details>
278+
230279
<details>
231280
<summary>Injector</summary>
232281

@@ -253,6 +302,14 @@ The detailed SQL used can be found in the [migrations folder](https://github.com
253302

254303
The schema and data changes are part of the same linear stream - the downstream consumers always observe the schema changes as soon as they happen, before any data arrives that relies on the new schema. This prevents data loss and manual intervention.
255304

305+
## Snapshots
306+
307+
`pgstream` can handle the generation of PostgreSQL snapshots, including both schema and data. The current implementations for each are:
308+
309+
- Schema: depending on the configuration, it can use either the pgstream `schema_log` table to get the schema view and process it as events downstream, or rely on the `pg_dump`/`pg_restore` PostgreSQL utilities.
310+
311+
- Data: it relies on transaction snapshot ids to obtain a stable view of the database, and paralellises the read of all the rows by dividing them into ranges using the `ctid`.
312+
256313
## Architecture
257314

258315
`pgstream` is constructed as a streaming pipeline, where data from one module streams into the next, eventually reaching the configured output plugins. It keeps track of schema changes and replicates them along with the data changes to ensure a consistent view of the source data downstream. This modular approach makes adding and integrating output plugin implementations simple and painless.
@@ -267,7 +324,9 @@ A listener is anything that listens for WAL data, regardless of the source. It h
267324

268325
There are currently two implementations of the listener:
269326

270-
- **Postgres listener**: listens to WAL events directly from the replication slot. Since the WAL replication slot is sequential, the Postgres WAL listener is limited to run as a single process. The associated Postgres checkpointer will sync the LSN so that the replication lag doesn't grow indefinitely.
327+
- **Postgres listener**: listens to WAL events directly from the replication slot. Since the WAL replication slot is sequential, the Postgres WAL listener is limited to run as a single process. The associated Postgres checkpointer will sync the LSN so that the replication lag doesn't grow indefinitely. It can be configured to perform an initial snapshot when pgstream is first connected to the source PostgreSQL database (see details in the [snapshots section](#snapshots)).
328+
329+
- **Postgres Snapshoter**: produces events by performing a snapshot of the configured PostgreSQL database, as described in the [snapshots section](#snapshots). It doesn't start continuous replication, so once all the snapshotted data has been processed, the pgstream process will stop.
271330

272331
- **Kafka reader**: reads WAL events from a Kafka topic. It can be configured to run concurrently by using partitions and Kafka consumer groups, applying a fan-out strategy to the WAL events. The data will be partitioned by database schema by default, but can be configured when using `pgstream` as a library. The associated Kafka checkpointer will commit the message offsets per topic/partition so that the consumer group doesn't process the same message twice.
273332

@@ -283,6 +342,8 @@ There are currently two implementations of the processor:
283342

284343
- **Webhook notifier**: it sends a notification to any webhooks that have subscribed to the relevant wal event. It relies on a subscription HTTP server receiving the subscription requests and storing them in the shared subscription store which is accessed whenever a wal event is processed. It sends the notifications to the different subscribed webhook urls in parallel based on a configurable number of workers (client timeouts apply). Similar to the two previous processor implementations, it uses a memory guarded buffering system internally, which allows to separate the wal event processing from the webhook url sending, optimising the processor latency.
285344

345+
- **Postgres batch writer**: it writes the WAL events into a PostgreSQL compatible database. It implements the same kind of mechanism than the Kafka and the search batch writers to ensure continuous processing from the listener, and it also uses a batching mechanism to minimise PostgreSQL IO traffic.
346+
286347
In addition to the implementations described above, there are optional processor decorators, which work in conjunction with one of the main processor implementations described above. Their goal is to act as modifiers to enrich the wal event being processed.
287348

288349
There are currently two implementations of the processor that act as decorators:

cmd/config.go

+20-10
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func parseListenerConfig() stream.ListenerConfig {
6969
return stream.ListenerConfig{
7070
Postgres: parsePostgresListenerConfig(),
7171
Kafka: parseKafkaListenerConfig(),
72+
Snapshot: parseSnapshotListenerConfig(),
7273
}
7374
}
7475

@@ -87,24 +88,32 @@ func parsePostgresListenerConfig() *stream.PostgresListenerConfig {
8788

8889
initialSnapshotEnabled := viper.GetBool("PGSTREAM_POSTGRES_LISTENER_INITIAL_SNAPSHOT_ENABLED")
8990
if initialSnapshotEnabled {
90-
cfg.Snapshot = parseSnapshotListenerConfig(pgURL)
91+
cfg.Snapshot = parseSnapshotConfig(pgURL, "PGSTREAM_POSTGRES_INITIAL")
9192
}
9293

9394
return cfg
9495
}
9596

96-
func parseSnapshotListenerConfig(pgURL string) *snapshotbuilder.SnapshotListenerConfig {
97+
func parseSnapshotListenerConfig() *snapshotbuilder.SnapshotListenerConfig {
98+
pgsnapshotURL := viper.GetString("PGSTREAM_POSTGRES_SNAPSHOT_LISTENER_URL")
99+
if pgsnapshotURL == "" {
100+
return nil
101+
}
102+
return parseSnapshotConfig(pgsnapshotURL, "PGSTREAM_POSTGRES")
103+
}
104+
105+
func parseSnapshotConfig(pgURL, prefix string) *snapshotbuilder.SnapshotListenerConfig {
97106
return &snapshotbuilder.SnapshotListenerConfig{
98-
SnapshotStoreURL: pgURL,
107+
SnapshotStoreURL: viper.GetString(fmt.Sprintf("%s_SNAPSHOT_STORE_URL", prefix)),
99108
Generator: pgsnapshotgenerator.Config{
100109
URL: pgURL,
101-
BatchPageSize: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_BATCH_PAGE_SIZE"),
102-
SchemaWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_SCHEMA_WORKERS"),
103-
TableWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS"),
110+
BatchPageSize: viper.GetUint(fmt.Sprintf("%s_SNAPSHOT_BATCH_PAGE_SIZE", prefix)),
111+
SchemaWorkers: viper.GetUint(fmt.Sprintf("%s_SNAPSHOT_SCHEMA_WORKERS", prefix)),
112+
TableWorkers: viper.GetUint(fmt.Sprintf("%s_SNAPSHOT_TABLE_WORKERS", prefix)),
104113
},
105114
Adapter: adapter.SnapshotConfig{
106-
Tables: viper.GetStringSlice("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES"),
107-
SnapshotWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_WORKERS"),
115+
Tables: viper.GetStringSlice(fmt.Sprintf("%s_SNAPSHOT_TABLES", prefix)),
116+
SnapshotWorkers: viper.GetUint(fmt.Sprintf("%s_SNAPSHOT_WORKERS", prefix)),
108117
},
109118
Schema: parseSchemaSnapshotConfig(pgURL),
110119
}
@@ -273,9 +282,10 @@ func parsePostgresProcessorConfig() *stream.PostgresProcessorConfig {
273282
MaxBatchSize: viper.GetInt64("PGSTREAM_POSTGRES_WRITER_BATCH_SIZE"),
274283
MaxQueueBytes: viper.GetInt64("PGSTREAM_POSTGRES_WRITER_MAX_QUEUE_BYTES"),
275284
},
276-
SchemaStore: pgschemalog.Config{
277-
URL: viper.GetString("PGSTREAM_POSTGRES_WRITER_SCHEMA_STORE_URL"),
285+
SchemaLogStore: pgschemalog.Config{
286+
URL: viper.GetString("PGSTREAM_POSTGRES_WRITER_SCHEMALOG_STORE_URL"),
278287
},
288+
DisableTriggers: viper.GetBool("PGSTREAM_POSTGRES_WRITER_DISABLE_TRIGGERS"),
279289
},
280290
}
281291
}

pg2pg.env

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Listener config
22
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=disable"
33
PGSTREAM_POSTGRES_LISTENER_INITIAL_SNAPSHOT_ENABLED=true
4+
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_STORE_URL="postgres://postgres:postgres@localhost?sslmode=disable"
45
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES="*"
56
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_SCHEMA_WORKERS=4
67
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS=4

pkg/stream/config.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Config struct {
2828
type ListenerConfig struct {
2929
Postgres *PostgresListenerConfig
3030
Kafka *KafkaListenerConfig
31+
Snapshot *snapshotbuilder.SnapshotListenerConfig
3132
}
3233

3334
type PostgresListenerConfig struct {
@@ -76,7 +77,7 @@ type WebhookSubscriptionStoreConfig struct {
7677
}
7778

7879
func (c *Config) IsValid() error {
79-
if c.Listener.Kafka == nil && c.Listener.Postgres == nil {
80+
if c.Listener.Kafka == nil && c.Listener.Postgres == nil && c.Listener.Snapshot == nil {
8081
return errors.New("need at least one listener configured")
8182
}
8283

pkg/stream/integration/helper_test.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,25 @@ func testPostgresListenerCfg() stream.ListenerConfig {
121121
}
122122
}
123123

124+
func testSnapshotListenerCfg(sourceURL, targetURL string, tables []string) stream.ListenerConfig {
125+
return stream.ListenerConfig{
126+
Snapshot: &snapshotbuilder.SnapshotListenerConfig{
127+
Generator: pgsnapshotgenerator.Config{
128+
URL: sourceURL,
129+
},
130+
Adapter: adapter.SnapshotConfig{
131+
Tables: tables,
132+
},
133+
Schema: snapshotbuilder.SchemaSnapshotConfig{
134+
DumpRestore: &pgdumprestore.Config{
135+
SourcePGURL: sourceURL,
136+
TargetPGURL: targetURL,
137+
},
138+
},
139+
},
140+
}
141+
}
142+
124143
func testPostgresListenerCfgWithSnapshot(sourceURL, targetURL string, tables []string) stream.ListenerConfig {
125144
return stream.ListenerConfig{
126145
Postgres: &stream.PostgresListenerConfig{
@@ -209,7 +228,7 @@ func testPostgresProcessorCfg(sourcePGURL string) stream.ProcessorConfig {
209228
BatchConfig: batch.Config{
210229
BatchTimeout: 50 * time.Millisecond,
211230
},
212-
SchemaStore: schemalogpg.Config{
231+
SchemaLogStore: schemalogpg.Config{
213232
URL: sourcePGURL,
214233
},
215234
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package integration
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"os"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/require"
13+
pglib "github.com/xataio/pgstream/internal/postgres"
14+
"github.com/xataio/pgstream/internal/testcontainers"
15+
"github.com/xataio/pgstream/pkg/stream"
16+
)
17+
18+
func Test_SnapshotToPostgres(t *testing.T) {
19+
if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" {
20+
t.Skip("skipping integration test...")
21+
}
22+
23+
// postgres container where pgstream hasn't been initialised to be used for
24+
// snapshot validation
25+
var snapshotPGURL string
26+
pgcleanup, err := testcontainers.SetupPostgresContainer(context.Background(), &snapshotPGURL, testcontainers.Postgres14, "config/postgresql.conf")
27+
require.NoError(t, err)
28+
defer pgcleanup()
29+
30+
ctx, cancel := context.WithCancel(context.Background())
31+
defer cancel()
32+
33+
testTable := "snapshot2pg_integration_test"
34+
// create table and populate it before initialising and running pgstream to
35+
// ensure the snapshot captures pre-existing schema and data properly
36+
execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf("create table %s(id serial primary key, name text)", testTable))
37+
execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf("insert into %s(name) values('a'),('b')", testTable))
38+
39+
cfg := &stream.Config{
40+
Listener: testSnapshotListenerCfg(snapshotPGURL, targetPGURL, []string{testTable}),
41+
Processor: testPostgresProcessorCfg(snapshotPGURL),
42+
}
43+
initStream(t, ctx, snapshotPGURL)
44+
runStream(t, ctx, cfg)
45+
46+
targetConn, err := pglib.NewConn(ctx, targetPGURL)
47+
require.NoError(t, err)
48+
49+
timer := time.NewTimer(20 * time.Second)
50+
defer timer.Stop()
51+
ticker := time.NewTicker(time.Second)
52+
defer ticker.Stop()
53+
54+
validation := func() bool {
55+
schemaColumns := getInformationSchemaColumns(t, ctx, targetConn, testTable)
56+
if len(schemaColumns) != 2 {
57+
return false
58+
}
59+
60+
wantSchemaCols := []*informationSchemaColumn{
61+
{name: "id", dataType: "integer", isNullable: "NO"},
62+
{name: "name", dataType: "text", isNullable: "YES"},
63+
}
64+
require.ElementsMatch(t, wantSchemaCols, schemaColumns)
65+
66+
columns := getTestTableColumns(t, ctx, targetConn, fmt.Sprintf("select id,name from %s", testTable))
67+
if len(columns) != 2 {
68+
return false
69+
}
70+
71+
wantCols := []*testTableColumn{
72+
{id: 1, name: "a"},
73+
{id: 2, name: "b"},
74+
}
75+
require.ElementsMatch(t, wantCols, columns)
76+
77+
return true
78+
}
79+
80+
for {
81+
select {
82+
case <-timer.C:
83+
cancel()
84+
t.Error("timeout waiting for postgres snapshot sync")
85+
return
86+
case <-ticker.C:
87+
if validation() {
88+
return
89+
}
90+
}
91+
}
92+
}

pkg/stream/stream_run.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/xataio/pgstream/pkg/wal/listener"
1818
kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka"
1919
pglistener "github.com/xataio/pgstream/pkg/wal/listener/postgres"
20+
snapshotlistener "github.com/xataio/pgstream/pkg/wal/listener/snapshot"
2021
snapshotbuilder "github.com/xataio/pgstream/pkg/wal/listener/snapshot/builder"
2122
"github.com/xataio/pgstream/pkg/wal/processor"
2223
"github.com/xataio/pgstream/pkg/wal/processor/injector"
@@ -283,7 +284,6 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, instrumentat
283284
return listener.Listen(ctx)
284285
})
285286
case config.Listener.Kafka != nil:
286-
var err error
287287
listener, err := kafkalistener.NewWALReader(
288288
kafkaReader,
289289
processor.ProcessWALEvent,
@@ -298,6 +298,25 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, instrumentat
298298
logger.Info("running kafka reader...")
299299
return listener.Listen(ctx)
300300
})
301+
302+
case config.Listener.Snapshot != nil:
303+
var err error
304+
snapshotGenerator, err := snapshotbuilder.NewSnapshotGenerator(
305+
ctx,
306+
config.Listener.Snapshot,
307+
processor.ProcessWALEvent,
308+
logger)
309+
if err != nil {
310+
return err
311+
}
312+
listener := snapshotlistener.New(snapshotGenerator)
313+
defer listener.Close()
314+
315+
eg.Go(func() error {
316+
defer logger.Info("stopping postgres snapshot listener...")
317+
logger.Info("running postgres snapshot listener...")
318+
return listener.Listen(ctx)
319+
})
301320
}
302321

303322
if err := eg.Wait(); err != nil {

0 commit comments

Comments
 (0)