Skip to content

Commit 651047f

Browse files
authored
Merge pull request #152 from xataio/use-pgdumprestore-snapshot-generator
Use pg_dump/pg_restore snapshot generator in pg listener
2 parents 4a933e1 + c03f80f commit 651047f

12 files changed

+242
-124
lines changed

build/docker/postgres/Dockerfile

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM postgres:15.3
1+
FROM postgres:17.2
22

33
RUN apt-get update \
4-
&& apt-get install -y postgresql-15-wal2json \
4+
&& apt-get install -y postgresql-17-wal2json \

cmd/config.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/xataio/pgstream/pkg/kafka"
1111
pgschemalog "github.com/xataio/pgstream/pkg/schemalog/postgres"
1212
pgsnapshotgenerator "github.com/xataio/pgstream/pkg/snapshot/generator/postgres/data"
13+
"github.com/xataio/pgstream/pkg/snapshot/generator/postgres/schema/pgdumprestore"
1314
"github.com/xataio/pgstream/pkg/stream"
1415
"github.com/xataio/pgstream/pkg/tls"
1516
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
@@ -86,9 +87,6 @@ func parsePostgresListenerConfig() *stream.PostgresListenerConfig {
8687
snapshotTables := viper.GetStringSlice("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES")
8788
if len(snapshotTables) > 0 {
8889
cfg.Snapshot = &pglistener.SnapshotConfig{
89-
SchemaLogStore: pgschemalog.Config{
90-
URL: pgURL,
91-
},
9290
SnapshotStoreURL: pgURL,
9391
Generator: pgsnapshotgenerator.Config{
9492
URL: pgURL,
@@ -98,12 +96,30 @@ func parsePostgresListenerConfig() *stream.PostgresListenerConfig {
9896
},
9997
Tables: snapshotTables,
10098
SnapshotWorkers: viper.GetUint("PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_WORKERS"),
99+
Schema: parseSchemaSnapshotConfig(pgURL),
101100
}
102101
}
103102

104103
return cfg
105104
}
106105

106+
func parseSchemaSnapshotConfig(pgurl string) pglistener.SchemaSnapshotConfig {
107+
pgTargetURL := viper.GetString("PGSTREAM_POSTGRES_WRITER_TARGET_URL")
108+
if pgTargetURL != "" {
109+
return pglistener.SchemaSnapshotConfig{
110+
DumpRestore: &pgdumprestore.Config{
111+
SourcePGURL: pgurl,
112+
TargetPGURL: pgTargetURL,
113+
},
114+
}
115+
}
116+
return pglistener.SchemaSnapshotConfig{
117+
SchemaLogStore: &pgschemalog.Config{
118+
URL: pgurl,
119+
},
120+
}
121+
}
122+
107123
func parseKafkaListenerConfig() *stream.KafkaListenerConfig {
108124
kafkaServers := viper.GetStringSlice("PGSTREAM_KAFKA_SERVERS")
109125
kafkaTopic := viper.GetString("PGSTREAM_KAFKA_TOPIC_NAME")

pkg/snapshot/generator/postgres/data/pg_schema_table_parser.go

-58
This file was deleted.

pkg/snapshot/generator/postgres/data/pg_snapshot_generator.go

+3-10
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ import (
1717
)
1818

1919
type SnapshotGenerator struct {
20-
logger loglib.Logger
21-
conn pglib.Querier
22-
mapper mapper
23-
tableParser tableParser
20+
logger loglib.Logger
21+
conn pglib.Querier
22+
mapper mapper
2423

2524
schemaWorkers uint
2625
tableWorkers uint
@@ -39,8 +38,6 @@ type pageRange struct {
3938
end uint
4039
}
4140

42-
type tableParser func(ctx context.Context, snapshot *snapshot.Snapshot) error
43-
4441
type Option func(sg *SnapshotGenerator)
4542

4643
func NewSnapshotGenerator(ctx context.Context, cfg *Config, processRow snapshot.RowProcessor, opts ...Option) (*SnapshotGenerator, error) {
@@ -57,7 +54,6 @@ func NewSnapshotGenerator(ctx context.Context, cfg *Config, processRow snapshot.
5754
batchPageSize: cfg.batchPageSize(),
5855
tableWorkers: cfg.tableWorkers(),
5956
schemaWorkers: cfg.schemaWorkers(),
60-
tableParser: newSchemaTableParser(conn).parseSnapshotTables,
6157
}
6258

6359
for _, opt := range opts {
@@ -76,9 +72,6 @@ func WithLogger(logger loglib.Logger) Option {
7672
}
7773

7874
func (sg *SnapshotGenerator) CreateSnapshot(ctx context.Context, ss *snapshot.Snapshot) error {
79-
if err := sg.tableParser(ctx, ss); err != nil {
80-
return err
81-
}
8275
return sg.conn.ExecInTxWithOptions(ctx, func(tx pglib.Tx) error {
8376
snapshotID, err := sg.exportSnapshot(ctx, tx)
8477
if err != nil {

pkg/snapshot/generator/postgres/data/pg_snapshot_generator_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,6 @@ func TestSnapshotGenerator_CreateSnapshot(t *testing.T) {
872872
schemaWorkers: 1,
873873
tableWorkers: 1,
874874
batchPageSize: 10,
875-
tableParser: func(ctx context.Context, snapshot *snapshot.Snapshot) error { return nil },
876875
}
877876

878877
if tc.schemaWorkers != 0 {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package tablefinder
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"slices"
9+
10+
pglib "github.com/xataio/pgstream/internal/postgres"
11+
"github.com/xataio/pgstream/pkg/snapshot"
12+
"github.com/xataio/pgstream/pkg/snapshot/generator"
13+
)
14+
15+
// SnapshotTableFinder is a decorator around a snapshot generator that will
16+
// explode the wildcard references in the snapshot requests and replace them by
17+
// all the schema tables in postgres.
18+
type SnapshotTableFinder struct {
19+
wrapped generator.SnapshotGenerator
20+
conn pglib.Querier
21+
}
22+
23+
const wildcard = "*"
24+
25+
// NewSnapshotTableFinder will return the generator on input wrapped with a
26+
// table finder that will explode the wildcard references in the snapshot
27+
// request and translate them into all the postgres tables for the given schema.
28+
func NewSnapshotTableFinder(ctx context.Context, pgurl string, generator generator.SnapshotGenerator) (*SnapshotTableFinder, error) {
29+
conn, err := pglib.NewConnPool(ctx, pgurl)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return &SnapshotTableFinder{
34+
wrapped: generator,
35+
conn: conn,
36+
}, nil
37+
}
38+
39+
func (s *SnapshotTableFinder) CreateSnapshot(ctx context.Context, ss *snapshot.Snapshot) error {
40+
if slices.Contains(ss.TableNames, wildcard) {
41+
var err error
42+
ss.TableNames, err = s.discoverAllSchemaTables(ctx, ss.SchemaName)
43+
if err != nil {
44+
return err
45+
}
46+
}
47+
return s.wrapped.CreateSnapshot(ctx, ss)
48+
}
49+
50+
func (s *SnapshotTableFinder) Close() error {
51+
return s.conn.Close(context.Background())
52+
}
53+
54+
func (s *SnapshotTableFinder) discoverAllSchemaTables(ctx context.Context, schema string) ([]string, error) {
55+
const query = "SELECT tablename FROM pg_tables WHERE schemaname=$1"
56+
rows, err := s.conn.Query(ctx, query, schema)
57+
if err != nil {
58+
return nil, fmt.Errorf("discovering all tables for schema %s: %w", schema, err)
59+
}
60+
defer rows.Close()
61+
62+
tableNames := []string{}
63+
for rows.Next() {
64+
var tableName string
65+
if err := rows.Scan(&tableName); err != nil {
66+
return nil, fmt.Errorf("scanning table name: %w", err)
67+
}
68+
tableNames = append(tableNames, tableName)
69+
}
70+
71+
if err := rows.Err(); err != nil {
72+
return nil, err
73+
}
74+
75+
return tableNames, nil
76+
}

0 commit comments

Comments
 (0)