Skip to content

Commit 299d363

Browse files
authored
Merge pull request #156 from xataio/decouple-snapshot-from-wal-listener
Decouple snapshot from wal listener
2 parents 4c811d9 + 50d31cf commit 299d363

16 files changed

+323
-224
lines changed

README.md

+10-9
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,16 @@ Here's a list of all the environment variables that can be used to configure the
129129
<details>
130130
<summary>Postgres Listener</summary>
131131

132-
| Environment Variable | Default | Required | Description |
133-
| -------------------------------------------------- | -------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
134-
| PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. |
135-
| PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME | "pgstream\_<dbname>\_slot" | No | Name of the Postgres replication slot name. |
136-
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES | "" | No | Tables for which there will be an initial snapshot generated. The syntax supports wildcards. Tables without a schema defined will be applied the public schema. Example: for `public.test_table` and all tables in the `test_schema` schema, the value would be the following: `"test_table test_schema.\*"` |
137-
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_SCHEMA_WORKERS | 4 | No | Number of tables per schema that will be processed in parallel by the snapshotting process. |
138-
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS | 4 | No | Number of concurrent workers that will be used per table by the snapshotting process. |
139-
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_BATCH_PAGE_SIZE | 1000 | No | Size of the table page range which will be processed concurrently by the table workers from `PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS`. |
140-
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_WORKERS | 1 | No | Number of schemas that will be processed in parallel by the snapshotting process. |
132+
| Environment Variable | Default | Required | Description |
133+
| --------------------------------------------------- | ---------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
134+
| PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. |
135+
| PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME | "pgstream_dbname_slot" | No | Name of the Postgres replication slot name. |
136+
| PGSTREAM_POSTGRES_LISTENER_INITIAL_SNAPSHOT_ENABLED | false | No | Enables performing an initial snapshot of the Postgres database before starting to listen to the replication slot. |
137+
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES | "" | No | Tables for which there will be an initial snapshot generated. The syntax supports wildcards. Tables without a schema defined will be applied the public schema. Example: for `public.test_table` and all tables in the `test_schema` schema, the value would be the following: `"test_table test_schema.\*"` |
138+
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_SCHEMA_WORKERS | 4 | No | Number of tables per schema that will be processed in parallel by the snapshotting process. |
139+
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS | 4 | No | Number of concurrent workers that will be used per table by the snapshotting process. |
140+
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_BATCH_PAGE_SIZE | 1000 | No | Size of the table page range which will be processed concurrently by the table workers from `PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS`. |
141+
| PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_WORKERS | 1 | No | Number of schemas that will be processed in parallel by the snapshotting process. |
141142

142143
</details>
143144

cmd/config.go

+25-18
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ import (
1414
"github.com/xataio/pgstream/pkg/stream"
1515
"github.com/xataio/pgstream/pkg/tls"
1616
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
17-
pglistener "github.com/xataio/pgstream/pkg/wal/listener/postgres"
17+
"github.com/xataio/pgstream/pkg/wal/listener/snapshot/adapter"
18+
snapshotbuilder "github.com/xataio/pgstream/pkg/wal/listener/snapshot/builder"
1819
"github.com/xataio/pgstream/pkg/wal/processor/batch"
1920
"github.com/xataio/pgstream/pkg/wal/processor/injector"
2021
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
@@ -84,36 +85,42 @@ func parsePostgresListenerConfig() *stream.PostgresListenerConfig {
8485
},
8586
}
8687

87-
snapshotTables := viper.GetStringSlice("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES")
88-
if len(snapshotTables) > 0 {
89-
cfg.Snapshot = &pglistener.SnapshotConfig{
90-
SnapshotStoreURL: pgURL,
91-
Generator: pgsnapshotgenerator.Config{
92-
URL: pgURL,
93-
BatchPageSize: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_BATCH_PAGE_SIZE"),
94-
SchemaWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_SCHEMA_WORKERS"),
95-
TableWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS"),
96-
},
97-
Tables: snapshotTables,
98-
SnapshotWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_WORKERS"),
99-
Schema: parseSchemaSnapshotConfig(pgURL),
100-
}
88+
initialSnapshotEnabled := viper.GetBool("PGSTREAM_POSTGRES_LISTENER_INITIAL_SNAPSHOT_ENABLED")
89+
if initialSnapshotEnabled {
90+
cfg.Snapshot = parseSnapshotListenerConfig(pgURL)
10191
}
10292

10393
return cfg
10494
}
10595

106-
func parseSchemaSnapshotConfig(pgurl string) pglistener.SchemaSnapshotConfig {
96+
func parseSnapshotListenerConfig(pgURL string) *snapshotbuilder.SnapshotListenerConfig {
97+
return &snapshotbuilder.SnapshotListenerConfig{
98+
SnapshotStoreURL: pgURL,
99+
Generator: pgsnapshotgenerator.Config{
100+
URL: pgURL,
101+
BatchPageSize: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_BATCH_PAGE_SIZE"),
102+
SchemaWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_SCHEMA_WORKERS"),
103+
TableWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS"),
104+
},
105+
Adapter: adapter.SnapshotConfig{
106+
Tables: viper.GetStringSlice("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES"),
107+
SnapshotWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_WORKERS"),
108+
},
109+
Schema: parseSchemaSnapshotConfig(pgURL),
110+
}
111+
}
112+
113+
func parseSchemaSnapshotConfig(pgurl string) snapshotbuilder.SchemaSnapshotConfig {
107114
pgTargetURL := viper.GetString("PGSTREAM_POSTGRES_WRITER_TARGET_URL")
108115
if pgTargetURL != "" {
109-
return pglistener.SchemaSnapshotConfig{
116+
return snapshotbuilder.SchemaSnapshotConfig{
110117
DumpRestore: &pgdumprestore.Config{
111118
SourcePGURL: pgurl,
112119
TargetPGURL: pgTargetURL,
113120
},
114121
}
115122
}
116-
return pglistener.SchemaSnapshotConfig{
123+
return snapshotbuilder.SchemaSnapshotConfig{
117124
SchemaLogStore: &pgschemalog.Config{
118125
URL: pgurl,
119126
},

pg2pg.env

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Listener config
22
PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=disable"
3+
PGSTREAM_POSTGRES_LISTENER_INITIAL_SNAPSHOT_ENABLED=true
34
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES="*"
45
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_SCHEMA_WORKERS=4
56
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLE_WORKERS=4

pkg/stream/config.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
"github.com/xataio/pgstream/pkg/kafka"
1010
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
11-
pglistener "github.com/xataio/pgstream/pkg/wal/listener/postgres"
11+
snapshotbuilder "github.com/xataio/pgstream/pkg/wal/listener/snapshot/builder"
1212
"github.com/xataio/pgstream/pkg/wal/processor/injector"
1313
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
1414
"github.com/xataio/pgstream/pkg/wal/processor/postgres"
@@ -32,7 +32,7 @@ type ListenerConfig struct {
3232

3333
type PostgresListenerConfig struct {
3434
Replication pgreplication.Config
35-
Snapshot *pglistener.SnapshotConfig
35+
Snapshot *snapshotbuilder.SnapshotListenerConfig
3636
}
3737

3838
type KafkaListenerConfig struct {

pkg/stream/integration/helper_test.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222
"github.com/xataio/pgstream/pkg/tls"
2323
"github.com/xataio/pgstream/pkg/wal"
2424
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
25-
pglistener "github.com/xataio/pgstream/pkg/wal/listener/postgres"
25+
"github.com/xataio/pgstream/pkg/wal/listener/snapshot/adapter"
26+
snapshotbuilder "github.com/xataio/pgstream/pkg/wal/listener/snapshot/builder"
2627
"github.com/xataio/pgstream/pkg/wal/processor/batch"
2728
"github.com/xataio/pgstream/pkg/wal/processor/injector"
2829
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
@@ -126,12 +127,14 @@ func testPostgresListenerCfgWithSnapshot(sourceURL, targetURL string, tables []s
126127
Replication: pgreplication.Config{
127128
PostgresURL: sourceURL,
128129
},
129-
Snapshot: &pglistener.SnapshotConfig{
130+
Snapshot: &snapshotbuilder.SnapshotListenerConfig{
130131
Generator: pgsnapshotgenerator.Config{
131132
URL: sourceURL,
132133
},
133-
Tables: tables,
134-
Schema: pglistener.SchemaSnapshotConfig{
134+
Adapter: adapter.SnapshotConfig{
135+
Tables: tables,
136+
},
137+
Schema: snapshotbuilder.SchemaSnapshotConfig{
135138
DumpRestore: &pgdumprestore.Config{
136139
SourcePGURL: sourceURL,
137140
TargetPGURL: targetURL,

pkg/stream/stream_run.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/xataio/pgstream/pkg/wal/listener"
1818
kafkalistener "github.com/xataio/pgstream/pkg/wal/listener/kafka"
1919
pglistener "github.com/xataio/pgstream/pkg/wal/listener/postgres"
20+
snapshotbuilder "github.com/xataio/pgstream/pkg/wal/listener/snapshot/builder"
2021
"github.com/xataio/pgstream/pkg/wal/processor"
2122
"github.com/xataio/pgstream/pkg/wal/processor/injector"
2223
processinstrumentation "github.com/xataio/pgstream/pkg/wal/processor/instrumentation"
@@ -258,7 +259,7 @@ func Run(ctx context.Context, logger loglib.Logger, config *Config, instrumentat
258259
}
259260
if config.Listener.Postgres.Snapshot != nil {
260261
logger.Info("initial snapshot enabled")
261-
snapshotGenerator, err := pglistener.NewSnapshotGeneratorAdapter(
262+
snapshotGenerator, err := snapshotbuilder.NewSnapshotGenerator(
262263
ctx,
263264
config.Listener.Postgres.Snapshot,
264265
processor.ProcessWALEvent,

pkg/wal/listener/postgres/config.go pkg/wal/listener/snapshot/adapter/config.go

+4-14
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,23 @@
11
// SPDX-License-Identifier: Apache-2.0
22

3-
package postgres
3+
package adapter
44

55
import (
66
"strings"
7-
8-
schemalogpg "github.com/xataio/pgstream/pkg/schemalog/postgres"
9-
pgsnapshotgenerator "github.com/xataio/pgstream/pkg/snapshot/generator/postgres/data"
10-
"github.com/xataio/pgstream/pkg/snapshot/generator/postgres/schema/pgdumprestore"
117
)
128

139
type SnapshotConfig struct {
14-
Generator pgsnapshotgenerator.Config
15-
SnapshotStoreURL string
16-
Tables []string
17-
Schema SchemaSnapshotConfig
10+
Tables []string
1811
// SnapshotWorkers represents the number of snapshots the generator will
1912
// process concurrently. This doesn't affect the parallelism of the tables
2013
// within each individual snapshot request. It defaults to 1.
2114
SnapshotWorkers uint
2215
}
2316

24-
type SchemaSnapshotConfig struct {
25-
SchemaLogStore *schemalogpg.Config
26-
DumpRestore *pgdumprestore.Config
27-
}
28-
2917
const defaultSnapshotWorkers = 1
3018

19+
const publicSchema = "public"
20+
3121
func (c *SnapshotConfig) schemaTableMap() map[string][]string {
3222
schemaTableMap := make(map[string][]string, len(c.Tables))
3323
for _, table := range c.Tables {

pkg/wal/listener/postgres/config_test.go pkg/wal/listener/snapshot/adapter/config_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22

3-
package postgres
3+
package adapter
44

55
import (
66
"testing"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package adapter
4+
5+
import (
6+
"context"
7+
"time"
8+
9+
"github.com/jonboulle/clockwork"
10+
"github.com/xataio/pgstream/pkg/snapshot"
11+
"github.com/xataio/pgstream/pkg/wal"
12+
"github.com/xataio/pgstream/pkg/wal/listener"
13+
)
14+
15+
type ProcessEventAdapter struct {
16+
processEvent listener.ProcessWalEvent
17+
clock clockwork.Clock
18+
}
19+
20+
func NewProcessEventAdapter(processEvent listener.ProcessWalEvent) *ProcessEventAdapter {
21+
return &ProcessEventAdapter{
22+
processEvent: processEvent,
23+
clock: clockwork.NewRealClock(),
24+
}
25+
}
26+
27+
func (a *ProcessEventAdapter) ProcessRow(ctx context.Context, row *snapshot.Row) error {
28+
return a.processEvent(ctx, a.snapshotRowToWalEvent(row))
29+
}
30+
31+
func (a *ProcessEventAdapter) snapshotRowToWalEvent(row *snapshot.Row) *wal.Event {
32+
if row == nil {
33+
return nil
34+
}
35+
36+
columns := make([]wal.Column, 0, len(row.Columns))
37+
for _, col := range row.Columns {
38+
columns = append(columns, a.snapshotColumnToWalColumn(col))
39+
}
40+
// use 0 since there's no LSN associated, but it can be used as the
41+
// initial version downstream
42+
const zeroLSN = "0/0"
43+
return &wal.Event{
44+
CommitPosition: wal.CommitPosition(zeroLSN),
45+
Data: &wal.Data{
46+
Action: "I",
47+
Timestamp: a.clock.Now().UTC().Format(time.RFC3339),
48+
LSN: zeroLSN,
49+
Schema: row.Schema,
50+
Table: row.Table,
51+
Columns: columns,
52+
},
53+
}
54+
}
55+
56+
func (a *ProcessEventAdapter) snapshotColumnToWalColumn(col snapshot.Column) wal.Column {
57+
return wal.Column{
58+
Name: col.Name,
59+
Type: col.Type,
60+
Value: col.Value,
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package adapter
4+
5+
import (
6+
"testing"
7+
"time"
8+
9+
"github.com/jonboulle/clockwork"
10+
"github.com/stretchr/testify/require"
11+
"github.com/xataio/pgstream/pkg/snapshot"
12+
"github.com/xataio/pgstream/pkg/wal"
13+
)
14+
15+
func TestProcessEventAdapter_snapshotRowToWalEvent(t *testing.T) {
16+
t.Parallel()
17+
18+
now := time.Now()
19+
fakeClock := clockwork.NewFakeClockAt(now)
20+
testTable := "table1"
21+
zeroLSN := "0/0"
22+
23+
tests := []struct {
24+
name string
25+
row *snapshot.Row
26+
27+
wantEvent *wal.Event
28+
}{
29+
{
30+
name: "ok - nil row",
31+
row: nil,
32+
33+
wantEvent: nil,
34+
},
35+
{
36+
name: "ok",
37+
row: &snapshot.Row{
38+
Schema: publicSchema,
39+
Table: testTable,
40+
Columns: []snapshot.Column{
41+
{Name: "id", Type: "int4", Value: 1},
42+
{Name: "name", Type: "text", Value: "alice"},
43+
},
44+
},
45+
46+
wantEvent: &wal.Event{
47+
CommitPosition: wal.CommitPosition(zeroLSN),
48+
Data: &wal.Data{
49+
Action: "I",
50+
Timestamp: fakeClock.Now().UTC().Format(time.RFC3339),
51+
LSN: zeroLSN,
52+
Schema: publicSchema,
53+
Table: testTable,
54+
Columns: []wal.Column{
55+
{Name: "id", Type: "int4", Value: 1},
56+
{Name: "name", Type: "text", Value: "alice"},
57+
},
58+
},
59+
},
60+
},
61+
}
62+
63+
for _, tc := range tests {
64+
tc := tc
65+
t.Run(tc.name, func(t *testing.T) {
66+
t.Parallel()
67+
68+
a := ProcessEventAdapter{
69+
clock: fakeClock,
70+
}
71+
event := a.snapshotRowToWalEvent(tc.row)
72+
73+
require.Equal(t, tc.wantEvent, event)
74+
})
75+
}
76+
}

0 commit comments

Comments
 (0)