Skip to content

Commit

Permalink
chore: Add OpenTelemetry spans to ServeShapePlug and ReplicationClient (
Browse files Browse the repository at this point in the history
#1695)

Part of #1664.
  • Loading branch information
alco committed Sep 14, 2024
1 parent 8ecd2b3 commit 66ee2ae
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 110 deletions.
5 changes: 5 additions & 0 deletions .changeset/lovely-rules-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Add OpenTelemetry spans for HTTP request handling and replication message processing.
194 changes: 141 additions & 53 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
defmodule Electric.Plug.ServeShapePlug do
require Logger
use Plug.Builder

alias OpenTelemetry.SemanticConventions, as: SC

alias Electric.Shapes
alias Electric.Schema
alias Electric.Replication.LogOffset
alias Electric.Telemetry.OpenTelemetry
alias Plug.Conn
use Plug.Builder

require Logger
require SC.Trace

# Aliasing for pattern matching
@before_all_offset LogOffset.before_all()
Expand Down Expand Up @@ -118,7 +124,8 @@ defmodule Electric.Plug.ServeShapePlug do
plug :load_shape_info
plug :put_schema_header

# We're starting listening as soon as possible to not miss stuff that was added since we've asked for last offset
# We're starting listening as soon as possible to not miss stuff that was added since we've
# asked for last offset
plug :listen_for_new_changes
plug :determine_log_chunk_offset
plug :generate_etag
Expand All @@ -145,8 +152,10 @@ defmodule Electric.Plug.ServeShapePlug do
end

defp load_shape_info(%Conn{} = conn, _) do
shape_info = get_or_create_shape_id(conn.assigns)
handle_shape_info(conn, shape_info)
OpenTelemetry.with_span("serve_shape_plug.load_shape_info", open_telemetry_attrs(conn), fn ->
shape_info = get_or_create_shape_id(conn.assigns)
handle_shape_info(conn, shape_info)
end)
end

# No shape_id is provided so we can get the existing one for this shape
Expand Down Expand Up @@ -290,8 +299,8 @@ defmodule Electric.Plug.ServeShapePlug do
end
end

defp put_resp_cache_headers(%Conn{} = conn, _) do
if conn.assigns.live do
defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do
if live do
put_resp_header(
conn,
"cache-control",
Expand All @@ -301,7 +310,7 @@ defmodule Electric.Plug.ServeShapePlug do
put_resp_header(
conn,
"cache-control",
"max-age=#{conn.assigns.config[:max_age]}, stale-while-revalidate=#{conn.assigns.config[:stale_age]}"
"max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}"
)
end
end
Expand All @@ -324,29 +333,37 @@ defmodule Electric.Plug.ServeShapePlug do
} = conn,
_
) do
case Shapes.get_snapshot(conn.assigns.config, shape_id) do
{:ok, {offset, snapshot}} ->
log =
Shapes.get_log_stream(conn.assigns.config, shape_id,
since: offset,
up_to: chunk_end_offset
)

[snapshot, log, maybe_up_to_date(conn)]
|> Stream.concat()
|> to_json_stream()
|> Stream.chunk_every(500)
|> send_stream(conn, 200)

{:error, reason} ->
Logger.warning("Could not serve a snapshot because of #{inspect(reason)}")

send_resp(
conn,
500,
Jason.encode_to_iodata!(%{error: "Failed creating or fetching the snapshot"})
)
end
OpenTelemetry.with_span(
"serve_shape_plug.serve_log_or_snapshot",
open_telemetry_attrs(conn),
fn ->
case Shapes.get_snapshot(conn.assigns.config, shape_id) do
{:ok, {offset, snapshot}} ->
OpenTelemetry.with_span("serve_shape_plug.get_log_stream", [], fn ->
log =
Shapes.get_log_stream(conn.assigns.config, shape_id,
since: offset,
up_to: chunk_end_offset
)

[snapshot, log, maybe_up_to_date(conn)]
|> Stream.concat()
|> to_json_stream()
|> Stream.chunk_every(500)
|> send_stream(conn, 200)
end)

{:error, reason} ->
Logger.warning("Could not serve a snapshot because of #{inspect(reason)}")

send_resp(
conn,
500,
Jason.encode_to_iodata!(%{error: "Failed creating or fetching the snapshot"})
)
end
end
)
end

# Otherwise, serve log since that offset
Expand All @@ -360,18 +377,27 @@ defmodule Electric.Plug.ServeShapePlug do
} = conn,
_
) do
log =
Shapes.get_log_stream(conn.assigns.config, shape_id, since: offset, up_to: chunk_end_offset)
OpenTelemetry.with_span(
"serve_shape_plug.get_log_stream_and_maybe_hold",
open_telemetry_attrs(conn),
fn ->
log =
Shapes.get_log_stream(conn.assigns.config, shape_id,
since: offset,
up_to: chunk_end_offset
)

if Enum.take(log, 1) == [] and conn.assigns.live do
hold_until_change(conn, shape_id)
else
[log, maybe_up_to_date(conn)]
|> Stream.concat()
|> to_json_stream()
|> Stream.chunk_every(500)
|> send_stream(conn, 200)
end
if Enum.take(log, 1) == [] and conn.assigns.live do
hold_until_change(conn, shape_id)
else
[log, maybe_up_to_date(conn)]
|> Stream.concat()
|> to_json_stream()
|> Stream.chunk_every(500)
|> send_stream(conn, 200)
end
end
)
end

@json_list_start "["
Expand All @@ -388,18 +414,26 @@ defmodule Electric.Plug.ServeShapePlug do
defp send_stream(stream, conn, status) do
conn = Conn.send_chunked(conn, status)

Enum.reduce_while(stream, conn, fn chunk, conn ->
case Conn.chunk(conn, chunk) do
{:ok, conn} ->
{:cont, conn}

{:error, "closed"} ->
{:halt, conn}
OpenTelemetry.add_span_attributes(%{SC.Trace.http_status_code() => status})

{:error, reason} ->
Logger.error("Error while streaming response: #{inspect(reason)}")
{:halt, conn}
end
Enum.reduce_while(stream, conn, fn chunk, conn ->
OpenTelemetry.with_span(
"serve_shape_plug.serve_log_or_snapshot.chunk",
[chunk_size: IO.iodata_length(chunk)],
fn ->
case Conn.chunk(conn, chunk) do
{:ok, conn} ->
{:cont, conn}

{:error, "closed"} ->
{:halt, conn}

{:error, reason} ->
Logger.error("Error while streaming response: #{inspect(reason)}")
{:halt, conn}
end
end
)
end)
end

Expand Down Expand Up @@ -455,4 +489,58 @@ defmodule Electric.Plug.ServeShapePlug do
long_poll_timeout -> send_resp(conn, 204, ["[", @up_to_date, "]"])
end
end

defp open_telemetry_attrs(%Plug.Conn{} = conn) do
%{
:"shape.id" => Map.get(conn.query_params, "shape_id"),
:"conn.assigns" => Map.delete(conn.assigns, :config),
:"http.req_headers" => conn.req_headers,
:"http.resp_headers" => conn.resp_headers,
:"http.query_string" => conn.query_string,
:"http.query_params" => query_params_attr(conn.query_params),
SC.Trace.http_url() =>
%URI{
scheme: to_string(conn.scheme),
host: conn.host,
port: conn.port,
path: conn.request_path,
query: conn.query_string
}
|> to_string(),
SC.Trace.http_client_ip() => client_ip(conn),
SC.Trace.http_scheme() => conn.scheme,
SC.Trace.net_peer_name() => conn.host,
SC.Trace.net_peer_port() => conn.port,
SC.Trace.http_target() => conn.request_path,
SC.Trace.http_method() => conn.method,
SC.Trace.http_status_code() => conn.status,
SC.Trace.net_transport() => :"IP.TCP",
SC.Trace.http_user_agent() => user_agent(conn)
}
end

defp client_ip(%Plug.Conn{remote_ip: remote_ip} = conn) do
case Plug.Conn.get_req_header(conn, "x-forwarded-for") do
[] ->
remote_ip
|> :inet_parse.ntoa()
|> to_string()

[ip_address | _] ->
ip_address
end
end

defp user_agent(%Plug.Conn{} = conn) do
case Plug.Conn.get_req_header(conn, "user-agent") do
[] -> ""
[head | _] -> head
end
end

defp query_params_attr(map) do
map
|> Enum.map(fn {key, val} -> key <> "=" <> val end)
|> Enum.sort()
end
end
60 changes: 44 additions & 16 deletions packages/sync-service/lib/electric/postgres/replication_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Electric.Postgres.ReplicationClient do
alias Electric.Postgres.ReplicationClient.Collector
alias Electric.Postgres.ReplicationClient.ConnectionSetup
alias Electric.Replication.Changes.Relation
alias Electric.Telemetry.OpenTelemetry

require Logger

Expand Down Expand Up @@ -190,8 +191,30 @@ defmodule Electric.Postgres.ReplicationClient do
<<@repl_msg_x_log_data, _wal_start::64, wal_end::64, _clock::64, rest::binary>>,
%State{} = state
) do
rest
|> Decoder.decode()
OpenTelemetry.with_span(
"replication_client.process_x_log_data",
[msg_size: byte_size(rest)],
fn -> process_x_log_data(rest, wal_end, state) end
)
end

def handle_data(<<@repl_msg_primary_keepalive, wal_end::64, _clock::64, reply>>, state) do
Logger.debug(fn ->
"Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}"
end)

messages =
case reply do
1 -> [encode_standby_status_update(state)]
0 -> []
end

{:noreply, messages, state}
end

defp process_x_log_data(data, wal_end, %State{} = state) do
data
|> decode_message()
# # Useful for debugging:
# |> tap(fn %struct{} = msg ->
# message_type = struct |> to_string() |> String.split(".") |> List.last()
Expand All @@ -209,7 +232,13 @@ defmodule Electric.Postgres.ReplicationClient do

{%Relation{} = rel, %Collector{} = txn_collector} ->
{m, f, args} = state.relation_received
apply(m, f, [rel | args])

OpenTelemetry.with_span(
"replication_client.relation_received",
["rel.id": rel.id, "rel.schema": rel.schema, "rel.table": rel.table],
fn -> apply(m, f, [rel | args]) end
)

{:noreply, %{state | txn_collector: txn_collector}}

{txn, %Collector{} = txn_collector} ->
Expand All @@ -224,7 +253,12 @@ defmodule Electric.Postgres.ReplicationClient do
# backends will require different timeouts and the timeout will need to
# accomodate varying number of shape consumers. The default of 5_000 ms
# should work for our file-based storage backends, for now.
case apply(m, f, [txn | args]) do
OpenTelemetry.with_span(
"replication_client.transaction_received",
[num_changes: length(txn.changes), num_relations: MapSet.size(txn.affected_relations)],
fn -> apply(m, f, [txn | args]) end
)
|> case do
:ok ->
# We currently process incoming replication messages sequentially, persisting each
# new transaction into the shape log store. So, when the applied function
Expand All @@ -244,18 +278,12 @@ defmodule Electric.Postgres.ReplicationClient do
end
end

def handle_data(<<@repl_msg_primary_keepalive, wal_end::64, _clock::64, reply>>, state) do
Logger.debug(fn ->
"Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}"
end)

messages =
case reply do
1 -> [encode_standby_status_update(state)]
0 -> []
end

{:noreply, messages, state}
defp decode_message(data) do
OpenTelemetry.with_span(
"replication_client.decode_message",
[msg_size: byte_size(data)],
fn -> Decoder.decode(data) end
)
end

defp encode_standby_status_update(state) do
Expand Down
26 changes: 15 additions & 11 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,21 @@ defmodule Electric.ShapeCache.FileStorage do

@impl Electric.ShapeCache.Storage
def make_new_snapshot!(data_stream, %FS{} = opts) do
OpenTelemetry.with_span("storage.make_new_snapshot", [storage_impl: "mixed_disk"], fn ->
data_stream
|> Stream.map(&[&1, ?\n])
# Use the 4 byte marker (ASCII "end of transmission") to indicate the end of the snapshot,
# so that concurrent readers can detect that the snapshot has been completed.
|> Stream.concat([<<4::utf8>>])
|> Stream.into(File.stream!(shape_snapshot_path(opts), [:append, :delayed_write]))
|> Stream.run()

CubDB.put(opts.db, @snapshot_meta_key, LogOffset.first())
end)
OpenTelemetry.with_span(
"storage.make_new_snapshot",
[storage_impl: "mixed_disk", "shape.id": opts.shape_id],
fn ->
data_stream
|> Stream.map(&[&1, ?\n])
# Use the 4 byte marker (ASCII "end of transmission") to indicate the end of the snapshot,
# so that concurrent readers can detect that the snapshot has been completed.
|> Stream.concat([<<4::utf8>>])
|> Stream.into(File.stream!(shape_snapshot_path(opts), [:append, :delayed_write]))
|> Stream.run()

CubDB.put(opts.db, @snapshot_meta_key, LogOffset.first())
end
)
end

@impl Electric.ShapeCache.Storage
Expand Down
Loading

0 comments on commit 66ee2ae

Please sign in to comment.