Skip to content

Commit d1d4d56

Browse files
committed
provide plugin options in a more flexible way
1 parent 1e7f1e4 commit d1d4d56

File tree

8 files changed

+82
-21
lines changed

8 files changed

+82
-21
lines changed

lib/postgres_replication.ex

+15-7
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ defmodule PostgresReplication do
1111
* `publication_name` - The name of the publication to create. If not provided, it will use the schema and table name.
1212
* `replication_slot_name` - The name of the replication slot to create. If not provided, it will use the schema and table name.
1313
* `output_plugin` - The output plugin to use. Default is `pgoutput`.
14-
* `proto_version` - The protocol version to use. Default is `1`.
14+
* `output_plugin_options` - The options to pass to the output plugin If you use :publication_name as the value of the key, the lib will automatically set the generated or set value of `publication_name`. Default is [proto_version: "1", publication_names: :publication_name].
1515
* `handler_module` - The module that will handle the data received from the replication stream.
1616
* `target_pid` - The PID of the parent process that will receive the data.
1717
@@ -40,7 +40,7 @@ defmodule PostgresReplication do
4040
publication_name: String.t(),
4141
replication_slot_name: String.t(),
4242
output_plugin: String.t(),
43-
proto_version: integer(),
43+
output_plugin_options: Keyword.t(),
4444
handler_module: Handler.t(),
4545
metadata: map()
4646
}
@@ -52,7 +52,7 @@ defmodule PostgresReplication do
5252
publication_name: nil,
5353
replication_slot_name: nil,
5454
output_plugin: "pgoutput",
55-
proto_version: 1,
55+
output_plugin_options: [proto_version: "1", publication_names: :publication_name],
5656
handler_module: nil,
5757
metadata: %{}
5858

@@ -175,17 +175,25 @@ defmodule PostgresReplication do
175175
%__MODULE__{step: :start_replication_slot} = state
176176
) do
177177
%__MODULE__{
178-
proto_version: proto_version,
179178
replication_slot_name: replication_slot_name,
180-
publication_name: publication_name
179+
publication_name: publication_name,
180+
output_plugin_options: output_plugin_options
181181
} = state
182182

183183
Logger.info(
184-
"Starting stream replication for slot #{replication_slot_name} using publication #{publication_name} and protocol version #{proto_version}"
184+
"Starting stream replication for slot #{replication_slot_name} using plugins options: #{inspect(output_plugin_options)}"
185185
)
186186

187+
output_plugin_options =
188+
output_plugin_options
189+
|> Enum.map_join(", ", fn
190+
{k, :publication_name} -> "#{k} '#{publication_name}'"
191+
{k, v} -> "#{k} '#{v}'"
192+
end)
193+
|> String.trim()
194+
187195
query =
188-
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}')"
196+
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (#{output_plugin_options})"
189197

190198
{:stream, query, [], %{state | step: :streaming}}
191199
end

lib/postgres_replication/decoder.ex

+4-6
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,15 @@ defmodule PostgresReplication.Decoder do
136136
alias Messages.{
137137
Begin,
138138
Commit,
139+
Delete,
140+
Insert,
139141
Origin,
140142
Relation,
141143
Relation.Column,
142-
Insert,
143-
Update,
144-
Delete,
145144
Truncate,
146145
Type,
147-
Unsupported
146+
Unsupported,
147+
Update
148148
}
149149

150150
alias PostgresReplication.Protocol.Write
@@ -187,7 +187,6 @@ defmodule PostgresReplication.Decoder do
187187
}
188188
end
189189

190-
# TODO: Verify this is correct with real data from Postgres
191190
defp decode_message_impl(<<"O", lsn::binary-8, name::binary>>) do
192191
%Origin{
193192
origin_commit_lsn: decode_lsn(lsn),
@@ -201,7 +200,6 @@ defmodule PostgresReplication.Decoder do
201200
| [name | [<<replica_identity::binary-1, _number_of_columns::integer-16, columns::binary>>]]
202201
] = String.split(rest, <<0>>, parts: 3)
203202

204-
# TODO: Handle case where pg_catalog is blank, we should still return the schema as pg_catalog
205203
friendly_replica_identity =
206204
case replica_identity do
207205
"d" -> :default

lib/postgres_replication/handler.ex

+3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
defmodule PostgresReplication.Handler do
2+
@moduledoc """
3+
A behaviour module for handling logical replication messages.
4+
"""
25
@type t :: module()
36
@doc """
47
The `call/2` callback is called by the `PostgresReplication` module to send messages to the parent process. It also sends back to the server connection a message in return if the user wants to.

lib/postgres_replication/oid_database.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# WHERE typnamespace = (SELECT pgn.oid FROM pg_namespace pgn WHERE nspname = 'pg_catalog') -- Take only builting Postgres types with stable OID (extension types are not guaranteed to be stable)
1515
# AND typtype = 'b' -- Only basic types
1616
# AND typisdefined -- Ignore undefined types
17-
17+
# credo:disable-for-this-file
1818
defmodule PostgresReplication.OidDatabase do
1919
@moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string."
2020

lib/postgres_replication/protocol.ex

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
defmodule PostgresReplication.Protocol do
2-
alias PostgresReplication.Protocol.Write
2+
@moduledoc """
3+
Functions for parsing different types of WAL messages.
4+
"""
35
alias PostgresReplication.Protocol.KeepAlive
6+
alias PostgresReplication.Protocol.Write
47

58
defguard is_write(value) when binary_part(value, 0, 1) == <<?w>>
69
defguard is_keep_alive(value) when binary_part(value, 0, 1) == <<?k>>
@@ -97,9 +100,9 @@ defmodule PostgresReplication.Protocol do
97100
@doc """
98101
Message to send the server to not do any operation since the server can wait
99102
"""
100-
@spec hold() :: []
101-
def hold(), do: []
103+
@spec hold :: []
104+
def hold, do: []
102105

103106
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
104-
def current_time(), do: System.os_time(:microsecond) - @epoch
107+
def current_time, do: System.os_time(:microsecond) - @epoch
105108
end

mix.exs

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ defmodule PostgresReplication.MixProject do
2121
# Run "mix help deps" to learn about dependencies.
2222
defp deps do
2323
[
24-
{:postgrex, "~> 0.19.0"}
24+
{:postgrex, "~> 0.19.0"},
25+
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
26+
{:sobelow, "~> 0.13.0", only: [:dev, :test], runtime: false}
2527
]
2628
end
2729
end

mix.lock

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
%{
2+
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
3+
"credo": {:hex, :credo, "1.7.11", "d3e805f7ddf6c9c854fd36f089649d7cf6ba74c42bc3795d587814e3c9847102", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "56826b4306843253a66e47ae45e98e7d284ee1f95d53d1612bb483f88a8cf219"},
24
"db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"},
35
"decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"},
6+
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
7+
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
48
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
9+
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
510
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
611
}

test/postgres_replication_test.exs

+44-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,49 @@ defmodule PostgresReplicationTest do
1313
]
1414
],
1515
table: :all,
16-
opts: [name: __MODULE__, auto_reconnect: true],
16+
opts: [name: __MODULE__],
17+
handler_module: PostgresReplicationTest.PgoutputHandler,
18+
metadata: %{pid: self()}
19+
}
20+
21+
{:ok, conn} = Postgrex.start_link(opts.connection_opts)
22+
PostgresReplication.start_link(opts)
23+
24+
Postgrex.query!(
25+
conn,
26+
"INSERT INTO random_values (value) VALUES ('Random Text 1')",
27+
[]
28+
)
29+
30+
assert_receive %PostgresReplication.Decoder.Messages.Begin{}
31+
assert_receive %PostgresReplication.Decoder.Messages.Relation{}
32+
33+
assert_receive %PostgresReplication.Decoder.Messages.Insert{
34+
tuple_data: {_, "Random Text 1"}
35+
}
36+
37+
assert_receive %PostgresReplication.Decoder.Messages.Commit{}
38+
end
39+
40+
test "handles database connection, receives WAL changes and you can set plugin options" do
41+
opts = %PostgresReplication{
42+
connection_opts: [
43+
hostname: "localhost",
44+
username: "postgres",
45+
password: "postgres",
46+
database: "postgres",
47+
parameters: [
48+
application_name: "PostgresReplication"
49+
]
50+
],
51+
table: :all,
52+
opts: [name: __MODULE__],
53+
publication_name: "test_publication",
54+
output_plugin: "pgoutput",
55+
output_plugin_options: [
56+
proto_version: "2",
57+
publication_names: :publication_name
58+
],
1759
handler_module: PostgresReplicationTest.PgoutputHandler,
1860
metadata: %{pid: self()}
1961
}
@@ -58,6 +100,6 @@ defmodule PostgresReplicationTest do
58100
end
59101

60102
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
61-
defp current_time(), do: System.os_time(:microsecond) - @epoch
103+
defp current_time, do: System.os_time(:microsecond) - @epoch
62104
end
63105
end

0 commit comments

Comments
 (0)