diff --git a/.changeset/lovely-rules-argue.md b/.changeset/lovely-rules-argue.md new file mode 100644 index 0000000000..e90055ca50 --- /dev/null +++ b/.changeset/lovely-rules-argue.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Add OpenTelemetry spans for HTTP request handling and replication message processing. diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 42b0528591..7881717602 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -1,10 +1,16 @@ defmodule Electric.Plug.ServeShapePlug do - require Logger + use Plug.Builder + + alias OpenTelemetry.SemanticConventions, as: SC + alias Electric.Shapes alias Electric.Schema alias Electric.Replication.LogOffset + alias Electric.Telemetry.OpenTelemetry alias Plug.Conn - use Plug.Builder + + require Logger + require SC.Trace # Aliasing for pattern matching @before_all_offset LogOffset.before_all() @@ -118,7 +124,8 @@ defmodule Electric.Plug.ServeShapePlug do plug :load_shape_info plug :put_schema_header - # We're starting listening as soon as possible to not miss stuff that was added since we've asked for last offset + # We're starting listening as soon as possible to not miss stuff that was added since we've + # asked for last offset plug :listen_for_new_changes plug :determine_log_chunk_offset plug :generate_etag @@ -145,8 +152,10 @@ defmodule Electric.Plug.ServeShapePlug do end defp load_shape_info(%Conn{} = conn, _) do - shape_info = get_or_create_shape_id(conn.assigns) - handle_shape_info(conn, shape_info) + OpenTelemetry.with_span("serve_shape_plug.load_shape_info", open_telemetry_attrs(conn), fn -> + shape_info = get_or_create_shape_id(conn.assigns) + handle_shape_info(conn, shape_info) + end) end # No shape_id is provided so we can get the existing one for this shape @@ -290,8 +299,8 @@ defmodule Electric.Plug.ServeShapePlug do end end - defp put_resp_cache_headers(%Conn{} = conn, _) do - if conn.assigns.live do + defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do + if live do put_resp_header( conn, "cache-control", @@ -301,7 +310,7 @@ defmodule Electric.Plug.ServeShapePlug do put_resp_header( conn, "cache-control", - "max-age=#{conn.assigns.config[:max_age]}, stale-while-revalidate=#{conn.assigns.config[:stale_age]}" + "max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}" ) end end @@ -324,29 +333,37 @@ defmodule Electric.Plug.ServeShapePlug do } = conn, _ ) do - case Shapes.get_snapshot(conn.assigns.config, shape_id) do - {:ok, {offset, snapshot}} -> - log = - Shapes.get_log_stream(conn.assigns.config, shape_id, - since: offset, - up_to: chunk_end_offset - ) - - [snapshot, log, maybe_up_to_date(conn)] - |> Stream.concat() - |> to_json_stream() - |> Stream.chunk_every(500) - |> send_stream(conn, 200) - - {:error, reason} -> - Logger.warning("Could not serve a snapshot because of #{inspect(reason)}") - - send_resp( - conn, - 500, - Jason.encode_to_iodata!(%{error: "Failed creating or fetching the snapshot"}) - ) - end + OpenTelemetry.with_span( + "serve_shape_plug.serve_log_or_snapshot", + open_telemetry_attrs(conn), + fn -> + case Shapes.get_snapshot(conn.assigns.config, shape_id) do + {:ok, {offset, snapshot}} -> + OpenTelemetry.with_span("serve_shape_plug.get_log_stream", [], fn -> + log = + Shapes.get_log_stream(conn.assigns.config, shape_id, + since: offset, + up_to: chunk_end_offset + ) + + [snapshot, log, maybe_up_to_date(conn)] + |> Stream.concat() + |> to_json_stream() + |> Stream.chunk_every(500) + |> send_stream(conn, 200) + end) + + {:error, reason} -> + Logger.warning("Could not serve a snapshot because of #{inspect(reason)}") + + send_resp( + conn, + 500, + Jason.encode_to_iodata!(%{error: "Failed creating or fetching the snapshot"}) + ) + end + end + ) end # Otherwise, serve log since that offset @@ -360,18 +377,27 @@ defmodule Electric.Plug.ServeShapePlug do } = conn, _ ) do - log = - Shapes.get_log_stream(conn.assigns.config, shape_id, since: offset, up_to: chunk_end_offset) + OpenTelemetry.with_span( + "serve_shape_plug.get_log_stream_and_maybe_hold", + open_telemetry_attrs(conn), + fn -> + log = + Shapes.get_log_stream(conn.assigns.config, shape_id, + since: offset, + up_to: chunk_end_offset + ) - if Enum.take(log, 1) == [] and conn.assigns.live do - hold_until_change(conn, shape_id) - else - [log, maybe_up_to_date(conn)] - |> Stream.concat() - |> to_json_stream() - |> Stream.chunk_every(500) - |> send_stream(conn, 200) - end + if Enum.take(log, 1) == [] and conn.assigns.live do + hold_until_change(conn, shape_id) + else + [log, maybe_up_to_date(conn)] + |> Stream.concat() + |> to_json_stream() + |> Stream.chunk_every(500) + |> send_stream(conn, 200) + end + end + ) end @json_list_start "[" @@ -388,18 +414,26 @@ defmodule Electric.Plug.ServeShapePlug do defp send_stream(stream, conn, status) do conn = Conn.send_chunked(conn, status) - Enum.reduce_while(stream, conn, fn chunk, conn -> - case Conn.chunk(conn, chunk) do - {:ok, conn} -> - {:cont, conn} - - {:error, "closed"} -> - {:halt, conn} + OpenTelemetry.add_span_attributes(%{SC.Trace.http_status_code() => status}) - {:error, reason} -> - Logger.error("Error while streaming response: #{inspect(reason)}") - {:halt, conn} - end + Enum.reduce_while(stream, conn, fn chunk, conn -> + OpenTelemetry.with_span( + "serve_shape_plug.serve_log_or_snapshot.chunk", + [chunk_size: IO.iodata_length(chunk)], + fn -> + case Conn.chunk(conn, chunk) do + {:ok, conn} -> + {:cont, conn} + + {:error, "closed"} -> + {:halt, conn} + + {:error, reason} -> + Logger.error("Error while streaming response: #{inspect(reason)}") + {:halt, conn} + end + end + ) end) end @@ -455,4 +489,58 @@ defmodule Electric.Plug.ServeShapePlug do long_poll_timeout -> send_resp(conn, 204, ["[", @up_to_date, "]"]) end end + + defp open_telemetry_attrs(%Plug.Conn{} = conn) do + %{ + :"shape.id" => Map.get(conn.query_params, "shape_id"), + :"conn.assigns" => Map.delete(conn.assigns, :config), + :"http.req_headers" => conn.req_headers, + :"http.resp_headers" => conn.resp_headers, + :"http.query_string" => conn.query_string, + :"http.query_params" => query_params_attr(conn.query_params), + SC.Trace.http_url() => + %URI{ + scheme: to_string(conn.scheme), + host: conn.host, + port: conn.port, + path: conn.request_path, + query: conn.query_string + } + |> to_string(), + SC.Trace.http_client_ip() => client_ip(conn), + SC.Trace.http_scheme() => conn.scheme, + SC.Trace.net_peer_name() => conn.host, + SC.Trace.net_peer_port() => conn.port, + SC.Trace.http_target() => conn.request_path, + SC.Trace.http_method() => conn.method, + SC.Trace.http_status_code() => conn.status, + SC.Trace.net_transport() => :"IP.TCP", + SC.Trace.http_user_agent() => user_agent(conn) + } + end + + defp client_ip(%Plug.Conn{remote_ip: remote_ip} = conn) do + case Plug.Conn.get_req_header(conn, "x-forwarded-for") do + [] -> + remote_ip + |> :inet_parse.ntoa() + |> to_string() + + [ip_address | _] -> + ip_address + end + end + + defp user_agent(%Plug.Conn{} = conn) do + case Plug.Conn.get_req_header(conn, "user-agent") do + [] -> "" + [head | _] -> head + end + end + + defp query_params_attr(map) do + map + |> Enum.map(fn {key, val} -> key <> "=" <> val end) + |> Enum.sort() + end end diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index a08553f63c..2ad8ca6c1e 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -9,6 +9,7 @@ defmodule Electric.Postgres.ReplicationClient do alias Electric.Postgres.ReplicationClient.Collector alias Electric.Postgres.ReplicationClient.ConnectionSetup alias Electric.Replication.Changes.Relation + alias Electric.Telemetry.OpenTelemetry require Logger @@ -190,8 +191,30 @@ defmodule Electric.Postgres.ReplicationClient do <<@repl_msg_x_log_data, _wal_start::64, wal_end::64, _clock::64, rest::binary>>, %State{} = state ) do - rest - |> Decoder.decode() + OpenTelemetry.with_span( + "replication_client.process_x_log_data", + [msg_size: byte_size(rest)], + fn -> process_x_log_data(rest, wal_end, state) end + ) + end + + def handle_data(<<@repl_msg_primary_keepalive, wal_end::64, _clock::64, reply>>, state) do + Logger.debug(fn -> + "Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}" + end) + + messages = + case reply do + 1 -> [encode_standby_status_update(state)] + 0 -> [] + end + + {:noreply, messages, state} + end + + defp process_x_log_data(data, wal_end, %State{} = state) do + data + |> decode_message() # # Useful for debugging: # |> tap(fn %struct{} = msg -> # message_type = struct |> to_string() |> String.split(".") |> List.last() @@ -209,7 +232,13 @@ defmodule Electric.Postgres.ReplicationClient do {%Relation{} = rel, %Collector{} = txn_collector} -> {m, f, args} = state.relation_received - apply(m, f, [rel | args]) + + OpenTelemetry.with_span( + "replication_client.relation_received", + ["rel.id": rel.id, "rel.schema": rel.schema, "rel.table": rel.table], + fn -> apply(m, f, [rel | args]) end + ) + {:noreply, %{state | txn_collector: txn_collector}} {txn, %Collector{} = txn_collector} -> @@ -224,7 +253,12 @@ defmodule Electric.Postgres.ReplicationClient do # backends will require different timeouts and the timeout will need to # accomodate varying number of shape consumers. The default of 5_000 ms # should work for our file-based storage backends, for now. - case apply(m, f, [txn | args]) do + OpenTelemetry.with_span( + "replication_client.transaction_received", + [num_changes: length(txn.changes), num_relations: MapSet.size(txn.affected_relations)], + fn -> apply(m, f, [txn | args]) end + ) + |> case do :ok -> # We currently process incoming replication messages sequentially, persisting each # new transaction into the shape log store. So, when the applied function @@ -244,18 +278,12 @@ defmodule Electric.Postgres.ReplicationClient do end end - def handle_data(<<@repl_msg_primary_keepalive, wal_end::64, _clock::64, reply>>, state) do - Logger.debug(fn -> - "Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}" - end) - - messages = - case reply do - 1 -> [encode_standby_status_update(state)] - 0 -> [] - end - - {:noreply, messages, state} + defp decode_message(data) do + OpenTelemetry.with_span( + "replication_client.decode_message", + [msg_size: byte_size(data)], + fn -> Decoder.decode(data) end + ) end defp encode_standby_status_update(state) do diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage.ex b/packages/sync-service/lib/electric/shape_cache/file_storage.ex index e93ef5381a..06d131aef1 100644 --- a/packages/sync-service/lib/electric/shape_cache/file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/file_storage.ex @@ -122,17 +122,21 @@ defmodule Electric.ShapeCache.FileStorage do @impl Electric.ShapeCache.Storage def make_new_snapshot!(data_stream, %FS{} = opts) do - OpenTelemetry.with_span("storage.make_new_snapshot", [storage_impl: "mixed_disk"], fn -> - data_stream - |> Stream.map(&[&1, ?\n]) - # Use the 4 byte marker (ASCII "end of transmission") to indicate the end of the snapshot, - # so that concurrent readers can detect that the snapshot has been completed. - |> Stream.concat([<<4::utf8>>]) - |> Stream.into(File.stream!(shape_snapshot_path(opts), [:append, :delayed_write])) - |> Stream.run() - - CubDB.put(opts.db, @snapshot_meta_key, LogOffset.first()) - end) + OpenTelemetry.with_span( + "storage.make_new_snapshot", + [storage_impl: "mixed_disk", "shape.id": opts.shape_id], + fn -> + data_stream + |> Stream.map(&[&1, ?\n]) + # Use the 4 byte marker (ASCII "end of transmission") to indicate the end of the snapshot, + # so that concurrent readers can detect that the snapshot has been completed. + |> Stream.concat([<<4::utf8>>]) + |> Stream.into(File.stream!(shape_snapshot_path(opts), [:append, :delayed_write])) + |> Stream.run() + + CubDB.put(opts.db, @snapshot_meta_key, LogOffset.first()) + end + ) end @impl Electric.ShapeCache.Storage diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index a953a208c3..39c63f1ad8 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -173,19 +173,23 @@ defmodule Electric.ShapeCache.InMemoryStorage do @impl Electric.ShapeCache.Storage def make_new_snapshot!(data_stream, %MS{} = opts) do - OpenTelemetry.with_span("storage.make_new_snapshot", [storage_impl: "in_memory"], fn -> - table = opts.snapshot_table - - data_stream - |> Stream.with_index(1) - |> Stream.map(fn {log_item, index} -> {snapshot_key(index), log_item} end) - |> Stream.chunk_every(500) - |> Stream.each(fn chunk -> :ets.insert(table, chunk) end) - |> Stream.run() - - :ets.insert(table, {snapshot_end(), 0}) - :ok - end) + OpenTelemetry.with_span( + "storage.make_new_snapshot", + [storage_impl: "in_memory", "shape.id": opts.shape_id], + fn -> + table = opts.snapshot_table + + data_stream + |> Stream.with_index(1) + |> Stream.map(fn {log_item, index} -> {snapshot_key(index), log_item} end) + |> Stream.chunk_every(500) + |> Stream.each(fn chunk -> :ets.insert(table, chunk) end) + |> Stream.run() + + :ets.insert(table, {snapshot_end(), 0}) + :ok + end + ) end @impl Electric.ShapeCache.Storage diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 05cae10339..024f1ea978 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -43,7 +43,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do OpenTelemetry.with_span( "shape_cache.create_snapshot_task", - [], + shape_attrs(shape_id, shape), fn -> try do Utils.apply_fn_or_mfa(prepare_tables_fn_or_mfa, [pool, affected_tables]) @@ -80,28 +80,36 @@ defmodule Electric.Shapes.Consumer.Snapshotter do Postgrex.transaction( db_pool, fn conn -> - OpenTelemetry.with_span("shape_cache.query_in_readonly_txn", [], fn -> - Postgrex.query!(conn, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY", []) + OpenTelemetry.with_span( + "shape_cache.query_in_readonly_txn", + shape_attrs(shape_id, shape), + fn -> + Postgrex.query!(conn, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY", []) - %{rows: [[xmin]]} = - Postgrex.query!(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())", []) + %{rows: [[xmin]]} = + Postgrex.query!(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())", []) - GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin}) + GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin}) - # Enforce display settings *before* querying initial data to maintain consistent - # formatting between snapshot and live log entries. - Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, [])) + # Enforce display settings *before* querying initial data to maintain consistent + # formatting between snapshot and live log entries. + Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, [])) - stream = Querying.stream_initial_data(conn, shape) + stream = Querying.stream_initial_data(conn, shape) - GenServer.cast(parent, {:snapshot_started, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) - # could pass the shape and then make_new_snapshot! can pass it to row_to_snapshot_item - # that way it has the relation, but it is still missing the pk_cols - Storage.make_new_snapshot!(stream, storage) - end) + # could pass the shape and then make_new_snapshot! can pass it to row_to_snapshot_item + # that way it has the relation, but it is still missing the pk_cols + Storage.make_new_snapshot!(stream, storage) + end + ) end, timeout: :infinity ) end + + defp shape_attrs(shape_id, shape) do + ["shape.id": shape_id, "shape.root_table": shape.root_table, "shape.where": shape.where] + end end diff --git a/packages/sync-service/lib/electric/telemetry/open_telemetry.ex b/packages/sync-service/lib/electric/telemetry/open_telemetry.ex index 30674ce66c..a37b847bbf 100644 --- a/packages/sync-service/lib/electric/telemetry/open_telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry/open_telemetry.ex @@ -44,8 +44,10 @@ defmodule Electric.Telemetry.OpenTelemetry do @doc """ Create a span that starts at the current point in time and ends when `fun` returns. - Calling this function inside an another span establishes a parent-child relationship between - the two, as long as both calls happens within the same Elixir process. See `async_fun/4` for + Returns the result of calling the function `fun`. + + Calling this function inside another span establishes a parent-child relationship between + the two, as long as both calls happen within the same Elixir process. See `async_fun/4` for interprocess progragation of span context. """ @spec with_span(span_name(), span_attrs(), (-> t)) :: t when t: term