Skip to content

Commit 15765a1

Browse files
committed
handle plugins without opts
1 parent d1d4d56 commit 15765a1

File tree

7 files changed

+81
-18
lines changed

7 files changed

+81
-18
lines changed

example/inspect_calls/lib/handler.ex

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ defmodule Handler do
88
message
99
|> PostgresReplication.Protocol.parse()
1010
|> PostgresReplication.Decoder.decode_message()
11-
|> IO.inspect()
1211

1312
:noreply
1413
end

example/inspect_calls/lib/inspect_calls.ex

+1-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ defmodule InspectCalls do
3434
query = "INSERT INTO random_values (value) VALUES ('Random Text 1') RETURNING id"
3535

3636
%{rows: [[id]]} =
37-
Postgrex.query!(Process.whereis(WalReplication.DB1), query, []) |> IO.inspect()
38-
37+
Postgrex.query!(Process.whereis(WalReplication.DB1), query, [])
3938
id
4039
end
4140

example/wal_replication/lib/wal_replication/handler.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ defmodule WalReplication.Handler do
185185
query =
186186
"DELETE FROM #{schema}.#{table} WHERE #{conditions}"
187187

188-
Postgrex.query!(pid, query, values) |> IO.inspect()
188+
Postgrex.query!(pid, query, values)
189189

190190
:update ->
191191
column_names = Enum.map(state.values, &elem(&1, 0))

lib/postgres_replication.ex

+8-2
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,12 @@ defmodule PostgresReplication do
177177
%__MODULE__{
178178
replication_slot_name: replication_slot_name,
179179
publication_name: publication_name,
180+
output_plugin: output_plugin,
180181
output_plugin_options: output_plugin_options
181182
} = state
182183

183184
Logger.info(
184-
"Starting stream replication for slot #{replication_slot_name} using plugins options: #{inspect(output_plugin_options)}"
185+
"Starting stream replication for slot #{replication_slot_name} using the #{output_plugin} plugin with the options: #{inspect(output_plugin_options)}"
185186
)
186187

187188
output_plugin_options =
@@ -191,9 +192,14 @@ defmodule PostgresReplication do
191192
{k, v} -> "#{k} '#{v}'"
192193
end)
193194
|> String.trim()
195+
|> then(fn
196+
"" -> ""
197+
config -> " (#{config})"
198+
end)
194199

195200
query =
196-
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (#{output_plugin_options})"
201+
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0#{output_plugin_options}"
202+
|> IO.inspect()
197203

198204
{:stream, query, [], %{state | step: :streaming}}
199205
end

lib/postgres_replication/decoder.ex renamed to lib/postgres_replication/plugin/pgoutput/decoder.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule PostgresReplication.Decoder do
1+
defmodule PostgresReplication.Plugin.Pgoutput.Decoder do
22
@moduledoc """
33
Functions for decoding different types of logical replication messages.
44
"""
@@ -149,7 +149,7 @@ defmodule PostgresReplication.Decoder do
149149

150150
alias PostgresReplication.Protocol.Write
151151

152-
alias PostgresReplication.OidDatabase
152+
alias PostgresReplication.Plugin.Pgoutput.OidDatabase
153153

154154
@doc """
155155
Parses logical replication messages from Postgres

lib/postgres_replication/oid_database.ex renamed to lib/postgres_replication/plugin/pgoutput/oid_database.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# AND typtype = 'b' -- Only basic types
1616
# AND typisdefined -- Ignore undefined types
1717
# credo:disable-for-this-file
18-
defmodule PostgresReplication.OidDatabase do
18+
defmodule PostgresReplication.Plugin.Pgoutput.OidDatabase do
1919
@moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string."
2020

2121
@doc """

test/postgres_replication_test.exs

+68-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule PostgresReplicationTest do
22
use ExUnit.Case
3+
alias PostgresReplication.Plugin.Pgoutput.Decoder
34

45
test "handles database connection and receives WAL changes" do
56
opts = %PostgresReplication{
@@ -27,14 +28,14 @@ defmodule PostgresReplicationTest do
2728
[]
2829
)
2930

30-
assert_receive %PostgresReplication.Decoder.Messages.Begin{}
31-
assert_receive %PostgresReplication.Decoder.Messages.Relation{}
31+
assert_receive %Decoder.Messages.Begin{}
32+
assert_receive %Decoder.Messages.Relation{}
3233

33-
assert_receive %PostgresReplication.Decoder.Messages.Insert{
34+
assert_receive %Decoder.Messages.Insert{
3435
tuple_data: {_, "Random Text 1"}
3536
}
3637

37-
assert_receive %PostgresReplication.Decoder.Messages.Commit{}
38+
assert_receive %Decoder.Messages.Commit{}
3839
end
3940

4041
test "handles database connection, receives WAL changes and you can set plugin options" do
@@ -69,22 +70,80 @@ defmodule PostgresReplicationTest do
6970
[]
7071
)
7172

72-
assert_receive %PostgresReplication.Decoder.Messages.Begin{}
73-
assert_receive %PostgresReplication.Decoder.Messages.Relation{}
73+
assert_receive %Decoder.Messages.Begin{}
74+
assert_receive %Decoder.Messages.Relation{}
7475

75-
assert_receive %PostgresReplication.Decoder.Messages.Insert{
76+
assert_receive %Decoder.Messages.Insert{
7677
tuple_data: {_, "Random Text 1"}
7778
}
7879

79-
assert_receive %PostgresReplication.Decoder.Messages.Commit{}
80+
assert_receive %Decoder.Messages.Commit{}
81+
end
82+
83+
test "handles database connection, receives WAL changes and can use plugins without options" do
84+
opts = %PostgresReplication{
85+
connection_opts: [
86+
hostname: "localhost",
87+
username: "postgres",
88+
password: "postgres",
89+
database: "postgres",
90+
parameters: [
91+
application_name: "PostgresReplication"
92+
]
93+
],
94+
table: :all,
95+
opts: [name: __MODULE__],
96+
publication_name: "test_publication",
97+
output_plugin: "test_decoding",
98+
output_plugin_options: [],
99+
handler_module: PostgresReplicationTest.TestDecodingHandler,
100+
metadata: %{pid: self()}
101+
}
102+
103+
{:ok, conn} = Postgrex.start_link(opts.connection_opts)
104+
PostgresReplication.start_link(opts)
105+
106+
Postgrex.query!(
107+
conn,
108+
"INSERT INTO random_values (value) VALUES ('Random Text 1')",
109+
[]
110+
)
111+
112+
assert_receive :ok
113+
assert_receive :ok
114+
assert_receive :ok
115+
end
116+
117+
defmodule TestDecodingHandler do
118+
@behaviour PostgresReplication.Handler
119+
120+
@impl true
121+
def call(<<?w, _header::192, message::binary>>, %{metadata: %{pid: pid}}) do
122+
send(pid, :ok)
123+
:noreply
124+
end
125+
126+
# Handles keep alive messages
127+
def call(<<?k, wal_end::64, _clock::64, reply>>, _) do
128+
messages =
129+
case reply do
130+
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
131+
0 -> []
132+
end
133+
134+
{:reply, messages}
135+
end
136+
137+
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
138+
defp current_time, do: System.os_time(:microsecond) - @epoch
80139
end
81140

82141
defmodule PgoutputHandler do
83142
@behaviour PostgresReplication.Handler
84143

85144
@impl true
86145
def call(<<?w, _header::192, message::binary>>, %{metadata: %{pid: pid}}) do
87-
message |> PostgresReplication.Decoder.decode_message() |> then(&send(pid, &1))
146+
message |> Decoder.decode_message() |> then(&send(pid, &1))
88147
:noreply
89148
end
90149

0 commit comments

Comments
 (0)