Skip to content

Commit

Permalink
Detect DB change and clean all shapes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-dp committed Sep 16, 2024
1 parent 85bb8b6 commit 2bc5a0c
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 44 deletions.
13 changes: 9 additions & 4 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ defmodule Electric.ConnectionManager do
def handle_continue(:start_connection_pool, state) do
case start_connection_pool(state.connection_opts, state.pool_opts) do
{:ok, pid} ->
Electric.Timeline.check(get_pg_timeline(pid), state.timeline_opts)
Electric.Timeline.check({get_pg_id(pid), get_pg_timeline(pid)}, state.timeline_opts)

pg_version = query_pg_major_version(pid)

Expand Down Expand Up @@ -390,10 +390,15 @@ defmodule Electric.ConnectionManager do
Keyword.put(connection_opts, :socket_options, tcp_opts)
end

defp get_pg_id(conn) do
case Postgrex.query!(conn, "SELECT system_identifier FROM pg_control_system()", []) do
%Postgrex.Result{rows: [[system_identifier]]} -> system_identifier
end
end

defp get_pg_timeline(conn) do
case Postgrex.query(conn, "SELECT timeline_id FROM pg_control_checkpoint()", []) do
{:ok, %Postgrex.Result{rows: [[timeline_id]]}} -> timeline_id
{:error, _reason} -> nil
case Postgrex.query!(conn, "SELECT timeline_id FROM pg_control_checkpoint()", []) do
%Postgrex.Result{rows: [[timeline_id]]} -> timeline_id
end
end

Expand Down
58 changes: 33 additions & 25 deletions packages/sync-service/lib/electric/timeline.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
defmodule Electric.Timeline do
@moduledoc """
Genserver that tracks the Postgres timeline ID.
Module exporting functions for handling Postgres timelines.
Verifies the Postgres ID and its timeline.
"""
require Logger
alias Electric.PersistentKV

@type timeline :: integer() | nil
@type pg_id :: non_neg_integer()
@type timeline_id :: integer()
@type timeline :: {pg_id(), timeline_id()} | nil

@timeline_key "timeline_id"

@doc """
Checks the provided `pg_timeline` against Electric's timeline.
Checks that we're connected to the same Postgres DB as before and on the same timeline.
TO this end, it checks the provided `pg_id` against the persisted PG ID.
If the PG IDs match, it also checks the provided `pg_timeline` against the persisted timeline.
Normally, Postgres and Electric are on the same timeline and nothing must be done.
If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned.
If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline.
Expand All @@ -22,41 +26,45 @@ defmodule Electric.Timeline do
verify_timeline(pg_timeline, electric_timeline, opts)
end

# Handles the different cases of timeline comparison
@spec verify_timeline(timeline(), timeline(), keyword()) :: :ok
defp verify_timeline(nil, _, opts) do
Logger.warning("Unknown Postgres timeline; rotating shapes.")
clean_all_shapes(opts)
store_timeline(nil, opts)
end

defp verify_timeline(timeline_id, timeline_id, _opts) do
Logger.info("Connected to Postgres timeline #{timeline_id}")
defp verify_timeline({pg_id, timeline_id} = timeline, timeline, _) do
Logger.info("Connected to Postgres #{pg_id} and timeline #{timeline_id}")
:ok
end

defp verify_timeline(pg_timeline_id, nil, opts) do
defp verify_timeline({pg_id, timeline_id} = timeline, nil, opts) do
Logger.info("No previous timeline detected.")
Logger.info("Connected to Postgres timeline #{pg_timeline_id}")
# Store new timeline
store_timeline(pg_timeline_id, opts)
Logger.info("Connected to Postgres #{pg_id} and timeline #{timeline_id}")
store_timeline(timeline, opts)
end

defp verify_timeline({pg_id, _} = timeline, {electric_pg_id, _}, opts)
when pg_id != electric_pg_id do
Logger.warning(
"Detected different Postgres DB, with ID: #{pg_id}. Old Postgres DB had ID #{electric_pg_id}. Cleaning all shapes."
)

clean_all_shapes_and_store_timeline(timeline, opts)
end

defp verify_timeline({_, timeline_id} = timeline, _, opts) do
Logger.warning("Detected PITR to timeline #{timeline_id}; cleaning all shapes.")
clean_all_shapes_and_store_timeline(timeline, opts)
end

defp verify_timeline(pg_timeline_id, _, opts) do
Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.")
defp clean_all_shapes_and_store_timeline(timeline, opts) do
clean_all_shapes(opts)
# Store new timeline only after all shapes have been cleaned
store_timeline(pg_timeline_id, opts)
store_timeline(timeline, opts)
end

# Loads the timeline ID from persistent storage
# Loads the PG ID and timeline ID from persistent storage
@spec load_timeline(keyword()) :: timeline()
def load_timeline(opts) do
kv = make_serialized_kv(opts)

case PersistentKV.get(kv, @timeline_key) do
{:ok, timeline_id} ->
timeline_id
{:ok, [pg_id, timeline_id]} ->
{pg_id, timeline_id}

{:error, :not_found} ->
nil
Expand All @@ -67,9 +75,9 @@ defmodule Electric.Timeline do
end
end

defp store_timeline(timeline_id, opts) do
def store_timeline({pg_id, timeline_id}, opts) do
kv = make_serialized_kv(opts)
:ok = PersistentKV.set(kv, @timeline_key, timeline_id)
:ok = PersistentKV.set(kv, @timeline_key, [pg_id, timeline_id])
end

defp make_serialized_kv(opts) do
Expand Down
59 changes: 44 additions & 15 deletions packages/sync-service/test/electric/timeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,85 @@ defmodule Electric.TimelineTest do
%{kv: Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir)}
end

test "returns nil when no timeline ID is available", %{kv: kv} do
test "returns nil when no timeline is available", %{kv: kv} do
assert Timeline.load_timeline(persistent_kv: kv) == nil
end
end

describe "store_timeline/2" do
@moduletag :tmp_dir

setup context do
%{opts: [persistent_kv: Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir)]}
end

test "stores the timeline", %{opts: opts} do
timeline = {1, 2}
Timeline.store_timeline(timeline, opts)
assert ^timeline = Timeline.load_timeline(opts)
end
end

describe "check/2" do
@moduletag :tmp_dir

setup context do
timeline = context[:electric_timeline]
kv = Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir)
opts = [persistent_kv: kv, shape_cache: {ShapeCache, []}]

if timeline != nil do
Timeline.store_timeline(timeline, opts)
end

{:ok, [timeline: timeline, opts: opts]}
end

@tag electric_timeline: nil
test "stores the Postgres timeline if Electric has no timeline yet", %{opts: opts} do
timeline = 5
test "stores the timeline if Electric has no timeline yet", %{opts: opts} do
assert Timeline.load_timeline(opts) == nil

timeline = {2, 5}

assert :ok = Timeline.check(timeline, opts)
assert ^timeline = Timeline.load_timeline(opts)
end

@tag electric_timeline: 3
@tag electric_timeline: {1, 2}
test "proceeds without changes if Postgres' timeline matches Electric's timeline", %{
timeline: timeline,
opts: opts
} do
expect(ShapeCache, :clean_all_shapes, 0, fn _ -> :ok end)
assert ^timeline = Timeline.load_timeline(opts)
assert :ok = Timeline.check(timeline, opts)
assert ^timeline = Timeline.load_timeline(opts)
end

@tag electric_timeline: 3
test "cleans all shapes if Postgres' timeline does not match Electric's timeline", %{
@tag electric_timeline: {1, 3}
test "cleans all shapes on Point In Time Recovery (PITR)", %{
timeline: timeline,
opts: opts
} do
ShapeCache
|> expect(:clean_all_shapes, fn _ -> :ok end)
expect(ShapeCache, :clean_all_shapes, 1, fn _ -> :ok end)
assert ^timeline = Timeline.load_timeline(opts)

pg_timeline = 2
pg_timeline = {1, 2}
assert :ok = Timeline.check(pg_timeline, opts)

assert ^pg_timeline = Timeline.load_timeline(opts)
end

@tag electric_timeline: 3
test "cleans all shapes if Postgres' timeline is unknown", %{opts: opts} do
ShapeCache
|> expect(:clean_all_shapes, fn _ -> :ok end)
# TODO: add log output checks

assert :ok = Timeline.check(nil, opts)
assert Timeline.load_timeline(opts) == nil
@tag electric_timeline: {1, 3}
test "cleans all shapes when Postgres DB changed", %{timeline: timeline, opts: opts} do
expect(ShapeCache, :clean_all_shapes, 1, fn _ -> :ok end)
assert ^timeline = Timeline.load_timeline(opts)

pg_timeline = {2, 3}
assert :ok = Timeline.check(pg_timeline, opts)
assert ^pg_timeline = Timeline.load_timeline(opts)
end
end
end

0 comments on commit 2bc5a0c

Please sign in to comment.