From 007db7ff553b166a51de33ac4404c7e67d69a1e0 Mon Sep 17 00:00:00 2001 From: "E.FU" Date: Sun, 17 May 2026 15:53:33 +0800 Subject: [PATCH 1/3] feat: PromEx plugin module + optional dep (Task 21, batch A1) --- lib/ccxt_ocx/prom_ex/plugin.ex | 252 +++++++++++++++++++++++++++++++++ lib/ccxt_ocx/telemetry.ex | 8 ++ mix.exs | 11 +- mix.lock | 7 + 4 files changed, 276 insertions(+), 2 deletions(-) create mode 100644 lib/ccxt_ocx/prom_ex/plugin.ex diff --git a/lib/ccxt_ocx/prom_ex/plugin.ex b/lib/ccxt_ocx/prom_ex/plugin.ex new file mode 100644 index 0000000..ff3cbaa --- /dev/null +++ b/lib/ccxt_ocx/prom_ex/plugin.ex @@ -0,0 +1,252 @@ +defmodule CcxtOcx.PromEx.Plugin do + @moduledoc """ + Ship-with-the-library [PromEx](https://hex.pm/packages/prom_ex) plugin for + the `[:ccxt_ocx, ...]` telemetry event family. + + Consumers add `{:prom_ex, "~> 1.11"}` to their app's deps, then reference + this module in their PromEx config: + + defmodule MyApp.PromEx do + use PromEx, otp_app: :my_app + + @impl true + def plugins do + [ + PromEx.Plugins.Application, + PromEx.Plugins.Beam, + # Optional :pool / :poll_rate opts enable periodic pool memory snapshots. + {CcxtOcx.PromEx.Plugin, pool: MyApp.CcxtPool, poll_rate: 10_000} + ] + end + end + + PromEx is declared as an optional dependency of `:ccxt_ocx`, so this module + is only loaded in consumer projects that also have `:prom_ex` in their deps. + + ## Provided metrics + + ### Event-based (always on) + + Backed by `[:ccxt_ocx, :runtime, :memory]` — emitted from + `CcxtOcx.Runtime.memory/1`, runtime init/terminate, and + `CcxtOcx.RuntimePool.memory/1`: + + * `ccxt_ocx_runtime_memory_malloc_size_bytes` (last_value) + * `ccxt_ocx_runtime_memory_used_size_bytes` (last_value) + * `ccxt_ocx_runtime_memory_obj_count` (last_value) + + Backed by `[:ccxt_ocx, :rest, :stop | :exception]` — reserved for + Phase 2 `defunified` (Task 7); the metric definitions exist now so + dashboards stay stable when emission lights up: + + * `ccxt_ocx_rest_duration_milliseconds` (distribution) + * `ccxt_ocx_rest_total` (counter) + * `ccxt_ocx_rest_exceptions_total` (counter) + + Backed by `[:ccxt_ocx, :ws, :tick]` — reserved for Phase 3 + `defstreaming` (Task 11): + + * `ccxt_ocx_ws_ticks_total` (counter) + + ### Polling (opt-in) + + When the plugin is configured with `pool: `, a periodic + timer calls `CcxtOcx.RuntimePool.memory/1` every `:poll_rate` + milliseconds (default 5_000). Each call emits a fresh + `[:ccxt_ocx, :runtime, :memory]` event captured by the event metrics + above. No additional metric definitions — polling here is purely a + driver, not a separate metric stream. + + ## Tag stability + + All tag values are normalized through `tag_values/1` callbacks so that + Prometheus labels stay consistent across emission sites. Missing + metadata keys become `"none"`, runtime PIDs are inspected (`#PID<…>`), + and exceptions are normalized to `:kind` (`:error | :exit | :throw`). + """ + + use PromEx.Plugin + + alias CcxtOcx.Telemetry, as: T + + @impl true + def event_metrics(_opts) do + [ + runtime_event_metrics(), + rest_event_metrics(), + ws_event_metrics() + ] + end + + @impl true + def polling_metrics(opts) do + case Keyword.fetch(opts, :pool) do + {:ok, pool} -> + poll_rate = Keyword.get(opts, :poll_rate, 5_000) + + [ + Polling.build( + :ccxt_ocx_runtime_pool_memory_polling_metrics, + poll_rate, + {CcxtOcx.RuntimePool, :memory, [pool]}, + [] + ) + ] + + :error -> + [] + end + end + + ## ------------------------------------------------------------------ + ## Event-group builders + ## ------------------------------------------------------------------ + + @spec runtime_event_metrics() :: Event.t() + defp runtime_event_metrics do + memory_event = T.__runtime_memory__() + memory_tags = [:server, :pool, :phase] + memory_tag_values = &normalize_memory_metadata/1 + + Event.build(:ccxt_ocx_runtime_event_metrics, [ + last_value( + [:ccxt_ocx, :runtime, :memory, :malloc_size, :bytes], + event_name: memory_event, + measurement: :malloc_size, + description: "QuickJS malloc size for a ccxt_ocx runtime.", + tags: memory_tags, + tag_values: memory_tag_values, + unit: :byte + ), + last_value( + [:ccxt_ocx, :runtime, :memory, :used, :bytes], + event_name: memory_event, + measurement: :memory_used_size, + description: "QuickJS used memory for a ccxt_ocx runtime.", + tags: memory_tags, + tag_values: memory_tag_values, + unit: :byte + ), + last_value( + [:ccxt_ocx, :runtime, :memory, :obj_count], + event_name: memory_event, + measurement: :obj_count, + description: "QuickJS live object count for a ccxt_ocx runtime.", + tags: memory_tags, + tag_values: memory_tag_values + ) + ]) + end + + @spec rest_event_metrics() :: Event.t() + defp rest_event_metrics do + stop_event = T.__rest_stop__() + exception_event = T.__rest_exception__() + rest_tags = [:exchange, :method] + rest_tag_values = &normalize_rest_metadata/1 + exception_tags = [:exchange, :method, :kind] + exception_tag_values = &normalize_rest_exception_metadata/1 + + Event.build(:ccxt_ocx_rest_event_metrics, [ + distribution( + [:ccxt_ocx, :rest, :duration, :milliseconds], + event_name: stop_event, + measurement: :duration, + description: "CCXT REST call duration.", + tags: rest_tags, + tag_values: rest_tag_values, + unit: {:native, :millisecond}, + reporter_options: [buckets: [10, 50, 100, 250, 500, 1000, 5000]] + ), + counter( + [:ccxt_ocx, :rest, :total], + event_name: stop_event, + description: "CCXT REST call count.", + tags: rest_tags, + tag_values: rest_tag_values + ), + counter( + [:ccxt_ocx, :rest, :exceptions, :total], + event_name: exception_event, + description: "CCXT REST call exception count.", + tags: exception_tags, + tag_values: exception_tag_values + ) + ]) + end + + @spec ws_event_metrics() :: Event.t() + defp ws_event_metrics do + tick_event = T.__ws_tick__() + + Event.build(:ccxt_ocx_ws_event_metrics, [ + counter( + [:ccxt_ocx, :ws, :ticks, :total], + event_name: tick_event, + description: "CCXT WebSocket inbound message count.", + tags: [:exchange, :stream, :type], + tag_values: &normalize_ws_tick_metadata/1 + ) + ]) + end + + ## ------------------------------------------------------------------ + ## Tag-value normalizers + ## ------------------------------------------------------------------ + + @spec normalize_memory_metadata(:telemetry.event_metadata()) :: %{ + server: String.t(), + pool: String.t(), + phase: String.t() + } + defp normalize_memory_metadata(metadata) do + %{ + server: stringify(Map.get(metadata, :server)), + pool: stringify(Map.get(metadata, :pool)), + phase: stringify(Map.get(metadata, :phase)) + } + end + + @spec normalize_rest_metadata(:telemetry.event_metadata()) :: %{ + exchange: String.t(), + method: String.t() + } + defp normalize_rest_metadata(metadata) do + %{ + exchange: stringify(Map.get(metadata, :exchange)), + method: stringify(Map.get(metadata, :method)) + } + end + + @spec normalize_rest_exception_metadata(:telemetry.event_metadata()) :: %{ + exchange: String.t(), + method: String.t(), + kind: String.t() + } + defp normalize_rest_exception_metadata(metadata) do + %{ + exchange: stringify(Map.get(metadata, :exchange)), + method: stringify(Map.get(metadata, :method)), + kind: stringify(Map.get(metadata, :kind)) + } + end + + @spec normalize_ws_tick_metadata(:telemetry.event_metadata()) :: %{ + exchange: String.t(), + stream: String.t(), + type: String.t() + } + defp normalize_ws_tick_metadata(metadata) do + %{ + exchange: stringify(Map.get(metadata, :exchange)), + stream: stringify(Map.get(metadata, :stream)), + type: stringify(Map.get(metadata, :type)) + } + end + + @spec stringify(term()) :: String.t() + defp stringify(nil), do: "none" + defp stringify(value) when is_binary(value), do: value + defp stringify(value) when is_atom(value), do: Atom.to_string(value) + defp stringify(value), do: inspect(value) +end diff --git a/lib/ccxt_ocx/telemetry.ex b/lib/ccxt_ocx/telemetry.ex index c39a0c2..0db7d69 100644 --- a/lib/ccxt_ocx/telemetry.ex +++ b/lib/ccxt_ocx/telemetry.ex @@ -57,6 +57,14 @@ defmodule CcxtOcx.Telemetry do :telemetry.detach(handler_id) Or use the higher-level wrappers from this module when emitting from inside CcxtOcx. + + ## PromEx + + See `CcxtOcx.PromEx.Plugin` for a ship-with-the-library + [PromEx](https://hex.pm/packages/prom_ex) plugin that maps every event + in this module to Prometheus metrics with zero glue. Consumers add + `{:prom_ex, "~> 1.11"}` to their own deps and reference the plugin in + their PromEx config. """ @prefix [:ccxt_ocx] diff --git a/mix.exs b/mix.exs index f574a3d..da9f294 100644 --- a/mix.exs +++ b/mix.exs @@ -47,6 +47,11 @@ defmodule CcxtOcx.MixProject do # Observability (Task 14 — telemetry events + future memory monitor) {:telemetry, "~> 1.3"}, + # Optional PromEx plugin (Task 21 — CcxtOcx.PromEx.Plugin). + # Consumers add prom_ex to their own deps; we only need it present + # at compile time so `use PromEx.Plugin` expands cleanly. + {:prom_ex, "~> 1.11", optional: true}, + # JSON {:jason, "~> 1.4.5"}, @@ -65,9 +70,11 @@ defmodule CcxtOcx.MixProject do {:ex_ast, "~> 0.12.0", only: [:dev, :test], runtime: false}, {:reach, "~> 2.3.4", only: [:dev, :test], runtime: false}, - # Tidewave (non-Phoenix) + # Tidewave (non-Phoenix). Bandit is also kept in :test so PromEx's + # transitively-optional `:plug` dep is available when the optional + # `:prom_ex` compiles under :test (see `CcxtOcx.PromEx.Plugin`). {:tidewave, "~> 0.5.6", only: :dev}, - {:bandit, "~> 1.11.1", only: :dev} + {:bandit, "~> 1.11.1", only: [:dev, :test]} ] end diff --git a/mix.lock b/mix.lock index b5c04ae..4525dfd 100644 --- a/mix.lock +++ b/mix.lock @@ -31,17 +31,24 @@ "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "npm": {:hex, :npm, "0.7.3", "4e8e6ec092eb628c4afa7d720119eeb045e110c7d7e4b138599d86509ae3c267", [:mix], [{:hex_solver, "~> 0.2", [hex: :hex_solver, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:npm_semver, "~> 0.1.0", [hex: :npm_semver, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "167d5c55b31306819c5f620d8ac1f8107a82d54c31165600c5a73d03b381dd78"}, "npm_semver": {:hex, :npm_semver, "0.1.0", "3ab2c2a151d8c87c364209b2ca1a4fd2ab98507ed61afbd1ea12c1826e67200a", [:mix], [{:hex_solver, "~> 0.2", [hex: :hex_solver, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "77afbc4c523c19a572325190bc4c968ec027e1c6ef8538bcddacf835966072fa"}, + "octo_fetch": {:hex, :octo_fetch, "0.5.0", "f50701568b9fc752656367f82cc134d5fbefff37c5a0e8ddfcceb02ceee3f5fc", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "6226cc3c14ca948ee9f25fb0446322e5c288e215da9beba7899b6b5f4cd3ccb0"}, "oxc": {:hex, :oxc, "0.12.1", "46791003e69a8a1370704598c3369db457439728331190893611a4095b11d7d4", [:mix], [{:rustler, "~> 0.36", [hex: :rustler, repo: "hexpm", optional: true]}, {:rustler_precompiled, "~> 0.8", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}], "hexpm", "cc37a5f9248d4065b7f871c9cb3352b473cabaeabedf24947a7100f51b154251"}, + "peep": {:hex, :peep, "3.5.0", "9f6ead7b0f2c684494200c8fc02e7e62e8c459afe861b29bd859e4c96f402ed8", [:mix], [{:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:plug, "~> 1.16", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5a73a99c6e60062415efeb7e536a663387146463a3d3df1417da31fd665ac210"}, "plug": {:hex, :plug, "1.19.2", "e4950525b22c6789dfb38a3f95d47171ba159da3fc5a33be9643b43d5e8adb98", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b6fce20a56af5e60fa5dfecf3f907bb98ec981be43c79a3809a499bc3d133de0"}, "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, + "prom_ex": {:hex, :prom_ex, "1.11.0", "1f6d67f2dead92224cb4f59beb3e4d319257c5728d9638b4a5e8ceb51a4f9c7e", [:mix], [{:absinthe, ">= 1.7.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:broadway, ">= 1.1.0", [hex: :broadway, repo: "hexpm", optional: true]}, {:ecto, ">= 3.11.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.18", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.10.0", [hex: :oban, repo: "hexpm", optional: true]}, {:octo_fetch, "~> 0.4", [hex: :octo_fetch, repo: "hexpm", optional: false]}, {:peep, "~> 3.0", [hex: :peep, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.7.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.20.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.16.0", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, ">= 2.6.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, ">= 1.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.2", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 1.1", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "76b074bc3730f0802978a7eb5c7091a65473eaaf07e99ec9e933138dcc327805"}, "quickbeam": {:hex, :quickbeam, "0.10.12", "68334f4435a0e02f6254bc5e99ddecd06a6e0c758ea3e87890cba1e8293978d2", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:mint_web_socket, "~> 1.0", [hex: :mint_web_socket, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:npm, "~> 0.7.1", [hex: :npm, repo: "hexpm", optional: true]}, {:oxc, "~> 0.12.1", [hex: :oxc, repo: "hexpm", optional: false]}, {:zigler, "~> 0.15.2", [hex: :zigler, repo: "hexpm", optional: true]}, {:zigler_precompiled, "~> 0.1.4", [hex: :zigler_precompiled, repo: "hexpm", optional: false]}], "hexpm", "46d2c1f45b2226cd4292125bfcbd3256dc9cf0581365a284677a4bd5e13c75ff"}, "reach": {:hex, :reach, "2.3.4", "b1bdfbe445b2d4edeebbde2df48e0f4ccbb1d33e4479f0fa2bf15435f43e2d30", [:mix], [{:boxart, "~> 0.3.3", [hex: :boxart, repo: "hexpm", optional: true]}, {:ex_ast, "~> 0.12.0", [hex: :ex_ast, repo: "hexpm", optional: false]}, {:ex_dna, "~> 1.5", [hex: :ex_dna, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:libgraph, "~> 0.16.0", [hex: :libgraph, repo: "hexpm", optional: false]}, {:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: true]}, {:makeup_js, "~> 0.1", [hex: :makeup_js, repo: "hexpm", optional: true]}, {:quickbeam, "~> 0.10", [hex: :quickbeam, repo: "hexpm", optional: true]}], "hexpm", "c5fb84633c44c5ab1a10438cecafc81f3bc23a22fd8eb724dbdeb3d594d9bb6c"}, "req": {:hex, :req, "0.5.17", "0096ddd5b0ed6f576a03dde4b158a0c727215b15d2795e59e0916c6971066ede", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0b8bc6ffdfebbc07968e59d3ff96d52f2202d0536f10fef4dc11dc02a2a43e39"}, "rustler_precompiled": {:hex, :rustler_precompiled, "0.9.0", "3a052eda09f3d2436364645cc1f13279cf95db310eb0c17b0d8f25484b233aa0", [:mix], [{:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "471d97315bd3bf7b64623418b3693eedd8e47de3d1cb79a0ac8f9da7d770d94c"}, "sobelow": {:hex, :sobelow, "0.14.1", "2f81e8632f15574cba2402bcddff5497b413c01e6f094bc0ab94e83c2f74db81", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8fac9a2bd90fdc4b15d6fca6e1608efb7f7c600fa75800813b794ee9364c87f2"}, "sourceror": {:hex, :sourceror, "1.12.0", "da354c5f35aad3cc1132f5d5b0d8437d865e2661c263260480bab51b5eedb437", [:mix], [], "hexpm", "755703683bd014ebcd5de9acc24b68fb874a660a568d1d63f8f98cd8a6ef9cd0"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "styler": {:hex, :styler, "1.11.0", "35010d970689a23c2bcc8e97bd8bf7d20e3561d60c49be84654df5c37d051a9c", [:mix], [], "hexpm", "70f36165d0cf238a32b7a456fdef6a9c72e77e657d7ac4a0ace33aeba3f2b8c0"}, "telemetry": {:hex, :telemetry, "1.4.2", "a0cb522801dffb1c49fe6e30561badffc7b6d0e180db1300df759faa22062855", [:rebar3], [], "hexpm", "928f6495066506077862c0d1646609eed891a4326bee3126ba54b60af61febb1"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, + "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.2.1", "c9755987d7b959b557084e6990990cb96a50d6482c683fb9622a63837f3cd3d8", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5e2c599da4983c4f88a33e9571f1458bf98b0cf6ba930f1dc3a6e8cf45d5afb6"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.3.0", "d5c46420126b5ac2d72bc6580fb4f537d35e851cc0f8dbd571acf6d6e10f5ec7", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "51f18bed7128544a50f75897db9974436ea9bfba560420b646af27a9a9b35211"}, "thousand_island": {:hex, :thousand_island, "1.4.3", "2158209580f633be38d43ec4e3ce0a01079592b9657afff9080d5d8ca149a3af", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6e4ce09b0fd761a58594d02814d40f77daff460c48a7354a15ab353bb998ea0b"}, "tidewave": {:hex, :tidewave, "0.5.6", "91f35540b5599640443f1d3a1c6166bf506e202840261a6344e384e8813c1f64", [:mix], [{:circular_buffer, "~> 0.4 or ~> 1.0", [hex: :circular_buffer, repo: "hexpm", optional: false]}, {:igniter, "~> 0.6", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix_live_reload, ">= 1.6.1", [hex: :phoenix_live_reload, repo: "hexpm", optional: true]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "dc82d52b8b6ffc04680544b17cd340c7d4166bb0d63999eb960850526866b533"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, From 2bb470214c7db6afb1d1152795e37c576f419786 Mon Sep 17 00:00:00 2001 From: "E.FU" Date: Sun, 17 May 2026 16:13:12 +0800 Subject: [PATCH 2/3] feat: PromEx plugin tests + docs + rmap (Task 21, batch A2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes Task 21. Tests cover plugin shape (3 Event groups, 1 conditional Polling group), polling gating, custom poll rates, and live-emission tag normalization. README gains an "Observability — PromEx" section with the consumer config snippet. `:telemetry_metrics` added to dialyzer plt_add_apps so the optional :prom_ex dep type-checks cleanly. Plugin coverage: 96.15% (12/12 tests pass). Harness clean: format, credo (TODO-tagged debt only in untouched files), dialyzer (0 warnings), doctor (100/100/100). --- CHANGELOG.md | 11 ++ README.md | 45 +++++++ ROADMAP.md | 1 + mix.exs | 5 +- roadmap/data.json | 36 ++++++ roadmap/tasks.toml | 27 ++++ test/ccxt_ocx/prom_ex/plugin_test.exs | 172 ++++++++++++++++++++++++++ 7 files changed, 296 insertions(+), 1 deletion(-) create mode 100644 test/ccxt_ocx/prom_ex/plugin_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index e15bb08..c245972 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,17 @@ All notable changes to `ccxt_ocx` are recorded here. The format follows ### Phase 5: Production Hardening +#### Task 21: PromEx plugin (`CcxtOcx.PromEx.Plugin`) +**Completed** | [D:3/B:7/U:7 → Eff:2.33] 🎯 + +Ship-with-the-library PromEx plugin that maps every `[:ccxt_ocx, ...]` event to Prometheus metrics with zero glue. + +- New `CcxtOcx.PromEx.Plugin` — `event_metrics/1` covers runtime memory (live), REST (Phase 2-reserved), and WS tick (Phase 3-reserved). `polling_metrics/1` is opt-in via `pool:` / `poll_rate:` opts and drives `CcxtOcx.RuntimePool.memory/1` on a timer. +- Tag normalizers default missing metadata keys to `"none"`, stringify atoms, and inspect PIDs — Prometheus labels stay stable across emission sites (Runtime, RuntimePool, future macro-generated callers). +- `{:prom_ex, "~> 1.11", optional: true}` — pulled in for compile but not forced on consumers. `:bandit` extended to `:test` so `PromEx.Plug` compiles. `:telemetry_metrics` added to dialyzer `plt_add_apps`. +- "Observability — PromEx" section in README with consumer config snippet. +- Tests cover plugin shape, polling gating, custom poll rates, and live-emission tag normalization. + #### Task 14: Telemetry events **Completed** | [D:3/B:7/U:7 → Eff:2.33] 🎯 diff --git a/README.md b/README.md index 9c20ade..9b9a7f1 100644 --- a/README.md +++ b/README.md @@ -35,9 +35,54 @@ Telemetry events are emitted under the `[:ccxt_ocx]` prefix: - `[:ccxt_ocx, :runtime, :memory]` — QuickJS memory stats from any runtime or pool worker (see `CcxtOcx.Runtime.memory/1` and `CcxtOcx.RuntimePool.memory/1`). +- `[:ccxt_ocx, :rest, :start | :stop | :exception]` — reserved for Phase 2 + `defunified` (Task 7); metric definitions exist now so dashboards stay + stable when emission lights up. +- `[:ccxt_ocx, :ws, :tick]` — reserved for Phase 3 `defstreaming` (Task 11). Full event contract and handler examples live in `CcxtOcx.Telemetry`. +### PromEx (Prometheus / Grafana) + +`ccxt_ocx` ships a first-class [PromEx](https://hex.pm/packages/prom_ex) +plugin that maps every event above to Prometheus metrics with zero glue. +PromEx is an **optional dependency** — consumers add it to their own app: + +```elixir +# mix.exs (consumer app) +{:prom_ex, "~> 1.11"} +``` + +Then reference the plugin in PromEx config: + +```elixir +defmodule MyApp.PromEx do + use PromEx, otp_app: :my_app + + @impl true + def plugins do + [ + PromEx.Plugins.Application, + PromEx.Plugins.Beam, + # Optional :pool / :poll_rate opts enable periodic pool memory snapshots. + {CcxtOcx.PromEx.Plugin, pool: MyApp.CcxtPool, poll_rate: 10_000} + ] + end +end +``` + +Provided metrics: + +- `ccxt_ocx_runtime_memory_malloc_size_bytes` / `_used_size_bytes` / `_obj_count` + (last_value, tags: `[:server, :pool, :phase]`) +- `ccxt_ocx_rest_duration_milliseconds` (distribution, buckets: + 10/50/100/250/500/1000/5000 ms) +- `ccxt_ocx_rest_total` (counter) +- `ccxt_ocx_rest_exceptions_total` (counter) +- `ccxt_ocx_ws_ticks_total` (counter) + +See `CcxtOcx.PromEx.Plugin` for tag-stability details and polling config. + ## Live Exploration with Tidewave While building the macro layer, the fastest way to understand real behavior is to drive the system live: diff --git a/ROADMAP.md b/ROADMAP.md index 6e94acb..8bedb85 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -100,6 +100,7 @@ | Task 16 | ⬜ | 🎁 **production** · ApiToolkit integration [D:4/B:6/U:6 → Eff:1.5] 🚀 | | Task 16b | ⬜ | 🎁 **production** · Hoist rate-limit + nonce state into Elixir [D:7/B:9/U:8 → Eff:1.21] 📋 | | Task 17 | ⬜ | 🎁 **production** · Sandboxed deployment shape [D:5/B:8/U:5 → Eff:1.3] 📋 | +| Task 21 | ✅ | 🎁 **production** · PromEx plugin (`CcxtOcx.PromEx.Plugin`) [D:3/B:7/U:7 → Eff:2.33] 🎯 | --- diff --git a/mix.exs b/mix.exs index da9f294..5cff8ab 100644 --- a/mix.exs +++ b/mix.exs @@ -12,7 +12,10 @@ defmodule CcxtOcx.MixProject do aliases: aliases(), dialyzer: [ plt_add_deps: :apps_direct, - plt_add_apps: [:mix], + # :telemetry_metrics is transitively provided by :prom_ex (optional dep) + # — surface it for the CcxtOcx.PromEx.Plugin metric DSL (last_value/2, + # distribution/2, counter/2). + plt_add_apps: [:mix, :telemetry_metrics], plt_local_path: "priv/plts", plt_core_path: "priv/plts", ignore_warnings: ".dialyzer_ignore.exs" diff --git a/roadmap/data.json b/roadmap/data.json index 7f241c7..4b83cbb 100644 --- a/roadmap/data.json +++ b/roadmap/data.json @@ -677,6 +677,42 @@ "scored_at": "2026-05-16", "cross_repo": [] }, + { + "id": "21", + "phase": 5, + "bundle": "production", + "status": "done", + "title": "PromEx plugin (`CcxtOcx.PromEx.Plugin`)", + "scores": { + "d": 3, + "b": 7, + "u": 7 + }, + "eff": 2.33, + "markers": [], + "depends_on": [ + "14" + ], + "acceptance_criteria": [ + "lib/ccxt_ocx/prom_ex/plugin.ex exposes event_metrics/1 + polling_metrics/1", + "Metric definitions cover runtime memory (live), REST (Phase 2-reserved), WS tick (Phase 3-reserved)", + "Tag normalizers default missing metadata keys to \"none\" for stable Prometheus labels", + "`{:prom_ex, optional: true}` in mix.exs; `:plug` reachable in :test for PromEx.Plug compile", + "test/ccxt_ocx/prom_ex/plugin_test.exs verifies shape + live-emission smoke", + "Plugin module coverage at or above 80% (standard tier)", + "README has an 'Observability — PromEx' section with the consumer config snippet" + ], + "out_of_scope": [ + "Grafana dashboard JSON", + "Consumer-side example harness app", + "Production scrape integration tests" + ], + "body": "First-class [PromEx](https://hex.pm/packages/prom_ex) plugin mapping every `[:ccxt_ocx, ...]` event in `CcxtOcx.Telemetry` to Prometheus metrics with zero glue. Consumers add `{:prom_ex, \"~> 1.11\"}` to their own deps and reference `CcxtOcx.PromEx.Plugin` in their PromEx config; the dep is declared `optional: true` here so consumers who don't want PromEx aren't forced into it.\n\nProvides last_value memory metrics (live today via Task 14 emissions), reserved REST distribution/counter metrics (Phase 2 `defunified` lights up emission), and a reserved WS tick counter (Phase 3 `defstreaming`). Tag normalizers default missing metadata keys to `\"none\"` so Prometheus labels stay stable across emission sites.\n", + "started_at": "2026-05-17", + "done_at": "2026-05-17", + "scored_at": "2026-05-17", + "cross_repo": [] + }, { "id": "N0", "phase": 7, diff --git a/roadmap/tasks.toml b/roadmap/tasks.toml index 260dff8..12dd4d6 100644 --- a/roadmap/tasks.toml +++ b/roadmap/tasks.toml @@ -574,6 +574,33 @@ body = """ **Progress (2026-05-17):** Initial `docs/tidewave_examples.md` created and expanded with real patterns from live Tidewave sessions (Deribit option chains, define+call pattern, telemetry watching, complex surface pressure-testing). Referenced from README.md and CLAUDE.md. """ +[[task]] +id = "21" +phase = 5 +bundle = "production" +status = "done" +title = "PromEx plugin (`CcxtOcx.PromEx.Plugin`)" +scores = { d = 3, b = 7, u = 7 } +scored_at = "2026-05-17" +started_at = "2026-05-17" +done_at = "2026-05-17" +depends_on = ["14"] +body = """ +First-class [PromEx](https://hex.pm/packages/prom_ex) plugin mapping every `[:ccxt_ocx, ...]` event in `CcxtOcx.Telemetry` to Prometheus metrics with zero glue. Consumers add `{:prom_ex, "~> 1.11"}` to their own deps and reference `CcxtOcx.PromEx.Plugin` in their PromEx config; the dep is declared `optional: true` here so consumers who don't want PromEx aren't forced into it. + +Provides last_value memory metrics (live today via Task 14 emissions), reserved REST distribution/counter metrics (Phase 2 `defunified` lights up emission), and a reserved WS tick counter (Phase 3 `defstreaming`). Tag normalizers default missing metadata keys to `"none"` so Prometheus labels stay stable across emission sites. +""" +acceptance_criteria = [ + "lib/ccxt_ocx/prom_ex/plugin.ex exposes event_metrics/1 + polling_metrics/1", + "Metric definitions cover runtime memory (live), REST (Phase 2-reserved), WS tick (Phase 3-reserved)", + "Tag normalizers default missing metadata keys to \"none\" for stable Prometheus labels", + "`{:prom_ex, optional: true}` in mix.exs; `:plug` reachable in :test for PromEx.Plug compile", + "test/ccxt_ocx/prom_ex/plugin_test.exs verifies shape + live-emission smoke", + "Plugin module coverage at or above 80% (standard tier)", + "README has an 'Observability — PromEx' section with the consumer config snippet" +] +out_of_scope = ["Grafana dashboard JSON", "Consumer-side example harness app", "Production scrape integration tests"] + # Phase 7 — Native-Elixir Migration ─────────────────────────────────────── [[task]] diff --git a/test/ccxt_ocx/prom_ex/plugin_test.exs b/test/ccxt_ocx/prom_ex/plugin_test.exs new file mode 100644 index 0000000..9ace4a0 --- /dev/null +++ b/test/ccxt_ocx/prom_ex/plugin_test.exs @@ -0,0 +1,172 @@ +defmodule CcxtOcx.PromEx.PluginTest do + use ExUnit.Case, async: true + + alias CcxtOcx.PromEx.Plugin + alias PromEx.MetricTypes.Event + alias PromEx.MetricTypes.Polling + alias Telemetry.Metrics.Counter + alias Telemetry.Metrics.Distribution + alias Telemetry.Metrics.LastValue + + describe "event_metrics/1" do + test "returns three Event groups covering runtime, rest, ws families" do + groups = Plugin.event_metrics([]) + + assert length(groups) == 3 + assert Enum.all?(groups, &match?(%Event{}, &1)) + + names = Enum.map(groups, & &1.group_name) + assert :ccxt_ocx_runtime_event_metrics in names + assert :ccxt_ocx_rest_event_metrics in names + assert :ccxt_ocx_ws_event_metrics in names + end + + test "runtime memory group exposes three last_value metrics on [:ccxt_ocx, :runtime, :memory]" do + group = group_by_name(:ccxt_ocx_runtime_event_metrics) + + assert length(group.metrics) == 3 + assert Enum.all?(group.metrics, &match?(%LastValue{}, &1)) + assert Enum.all?(group.metrics, &(&1.event_name == [:ccxt_ocx, :runtime, :memory])) + + measurements = group.metrics |> Enum.map(& &1.measurement) |> Enum.sort() + assert measurements == [:malloc_size, :memory_used_size, :obj_count] + + assert Enum.all?(group.metrics, &(&1.tags == [:server, :pool, :phase])) + end + + test "rest group exposes a duration distribution + total counter + exception counter" do + group = group_by_name(:ccxt_ocx_rest_event_metrics) + + assert length(group.metrics) == 3 + + [distribution] = Enum.filter(group.metrics, &match?(%Distribution{}, &1)) + assert distribution.event_name == [:ccxt_ocx, :rest, :stop] + # `unit: {:native, :millisecond}` wraps the measurement atom in a converter fn + # via `Telemetry.Metrics.maybe_convert_measurement/2` — assert the shape, not the atom. + assert is_function(distribution.measurement, 1) + assert distribution.tags == [:exchange, :method] + assert distribution.reporter_options[:buckets] == [10, 50, 100, 250, 500, 1000, 5000] + + counters = Enum.filter(group.metrics, &match?(%Counter{}, &1)) + assert length(counters) == 2 + + stop_counter = Enum.find(counters, &(&1.event_name == [:ccxt_ocx, :rest, :stop])) + exception_counter = Enum.find(counters, &(&1.event_name == [:ccxt_ocx, :rest, :exception])) + + assert stop_counter.tags == [:exchange, :method] + assert exception_counter.tags == [:exchange, :method, :kind] + end + + test "ws group exposes a tick counter on [:ccxt_ocx, :ws, :tick]" do + group = group_by_name(:ccxt_ocx_ws_event_metrics) + + [tick] = group.metrics + assert match?(%Counter{}, tick) + assert tick.event_name == [:ccxt_ocx, :ws, :tick] + assert tick.tags == [:exchange, :stream, :type] + end + end + + describe "polling_metrics/1" do + test "returns [] when no :pool opt is provided" do + assert Plugin.polling_metrics([]) == [] + end + + test "returns a single Polling group with the configured MFA when :pool is set" do + [group] = Plugin.polling_metrics(pool: :test_pool) + + assert %Polling{} = group + assert group.group_name == :ccxt_ocx_runtime_pool_memory_polling_metrics + assert group.poll_rate == 5_000 + assert group.measurements_mfa == {CcxtOcx.RuntimePool, :memory, [:test_pool]} + assert group.metrics == [] + end + + test "honors a custom :poll_rate" do + [group] = Plugin.polling_metrics(pool: :test_pool, poll_rate: 15_000) + assert group.poll_rate == 15_000 + end + end + + describe "tag normalization (live emission smoke)" do + setup do + handler = make_ref() + test_pid = self() + + :telemetry.attach_many( + handler, + [[:ccxt_ocx, :runtime, :memory]], + fn event, meas, meta, _ -> send(test_pid, {:saw, event, meas, meta}) end, + %{} + ) + + on_exit(fn -> :telemetry.detach(handler) end) + :ok + end + + test "memory metric tag_values fills missing :pool and :phase with \"none\"" do + # Simulate a CcxtOcx.Runtime.memory/1 emission shape (server-only metadata). + meta = %{server: self()} + CcxtOcx.Telemetry.execute([:runtime, :memory], %{malloc_size: 1, memory_used_size: 1, obj_count: 1}, meta) + + assert_receive {:saw, [:ccxt_ocx, :runtime, :memory], _meas, ^meta} + + [malloc_metric | _] = group_by_name(:ccxt_ocx_runtime_event_metrics).metrics + normalized = malloc_metric.tag_values.(meta) + + assert is_binary(normalized.server) + assert String.starts_with?(normalized.server, "#PID<") + assert normalized.pool == "none" + assert normalized.phase == "none" + end + + test "memory metric tag_values stringifies atom :phase and atom :pool" do + meta = %{server: self(), pool: :my_pool, phase: :init} + [malloc_metric | _] = group_by_name(:ccxt_ocx_runtime_event_metrics).metrics + normalized = malloc_metric.tag_values.(meta) + + assert normalized.pool == "my_pool" + assert normalized.phase == "init" + end + + test "rest metric tag_values defaults missing :exchange / :method to \"none\"" do + [distribution] = + Enum.filter( + group_by_name(:ccxt_ocx_rest_event_metrics).metrics, + &match?(%Distribution{}, &1) + ) + + normalized = distribution.tag_values.(%{}) + assert normalized == %{exchange: "none", method: "none"} + end + + test "rest exception metric tag_values stringifies :kind and passes binaries through unchanged" do + exception_counter = + Enum.find( + group_by_name(:ccxt_ocx_rest_event_metrics).metrics, + &(match?(%Counter{}, &1) and &1.event_name == [:ccxt_ocx, :rest, :exception]) + ) + + normalized = + exception_counter.tag_values.(%{exchange: "binance", method: :fetch_ticker, kind: :error}) + + # Binary :exchange stays unchanged (exercises the is_binary branch of stringify/1). + assert normalized == %{exchange: "binance", method: "fetch_ticker", kind: "error"} + end + + test "ws tick metric tag_values normalizes all three keys" do + [tick] = group_by_name(:ccxt_ocx_ws_event_metrics).metrics + + assert tick.tag_values.(%{exchange: :binance, stream: "trades", type: :update}) == + %{exchange: "binance", stream: "trades", type: "update"} + + assert tick.tag_values.(%{}) == %{exchange: "none", stream: "none", type: "none"} + end + end + + ## Helpers + + defp group_by_name(name) do + Enum.find(Plugin.event_metrics([]), &(&1.group_name == name)) + end +end From 7ae93dbca719ca931095d84743de950fa71db92e Mon Sep 17 00:00:00 2001 From: "E.FU" Date: Sun, 17 May 2026 16:28:24 +0800 Subject: [PATCH 3/3] fix: guard optional PromEx plugin + correct used_size metric name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wraps CcxtOcx.PromEx.Plugin in `if Code.ensure_loaded?(PromEx.Plugin)` so downstream projects that don't depend on :prom_ex still compile ccxt_ocx cleanly — the module simply isn't defined for them. The existing `optional: true` flag in mix.exs only stops Mix from fetching the dep; it does NOT stop `use PromEx.Plugin` from failing to expand when the consumer recompiles ccxt_ocx without :prom_ex on the path. Also renames one metric path segment `:used` → `:used_size` so the exported series matches the moduledoc + README claim of `ccxt_ocx_runtime_memory_used_size_bytes` (was emitting `_used_bytes`). Both flagged on PR #7 by Codex (P1) and Copilot. --- lib/ccxt_ocx/prom_ex/plugin.ex | 449 +++++++++++++++++---------------- mix.exs | 6 +- 2 files changed, 231 insertions(+), 224 deletions(-) diff --git a/lib/ccxt_ocx/prom_ex/plugin.ex b/lib/ccxt_ocx/prom_ex/plugin.ex index ff3cbaa..8a3e12e 100644 --- a/lib/ccxt_ocx/prom_ex/plugin.ex +++ b/lib/ccxt_ocx/prom_ex/plugin.ex @@ -1,252 +1,257 @@ -defmodule CcxtOcx.PromEx.Plugin do - @moduledoc """ - Ship-with-the-library [PromEx](https://hex.pm/packages/prom_ex) plugin for - the `[:ccxt_ocx, ...]` telemetry event family. +if Code.ensure_loaded?(PromEx.Plugin) do + defmodule CcxtOcx.PromEx.Plugin do + @moduledoc """ + Ship-with-the-library [PromEx](https://hex.pm/packages/prom_ex) plugin for + the `[:ccxt_ocx, ...]` telemetry event family. + + Consumers add `{:prom_ex, "~> 1.11"}` to their app's deps, then reference + this module in their PromEx config: + + defmodule MyApp.PromEx do + use PromEx, otp_app: :my_app + + @impl true + def plugins do + [ + PromEx.Plugins.Application, + PromEx.Plugins.Beam, + # Optional :pool / :poll_rate opts enable periodic pool memory snapshots. + {CcxtOcx.PromEx.Plugin, pool: MyApp.CcxtPool, poll_rate: 10_000} + ] + end + end - Consumers add `{:prom_ex, "~> 1.11"}` to their app's deps, then reference - this module in their PromEx config: + PromEx is declared as an **optional** dependency of `:ccxt_ocx`. This + module is wrapped in `Code.ensure_loaded?(PromEx.Plugin)` so that + consumer projects without `:prom_ex` in their own deps can still + compile `:ccxt_ocx` — the module simply isn't defined when PromEx is + absent. - defmodule MyApp.PromEx do - use PromEx, otp_app: :my_app + ## Provided metrics - @impl true - def plugins do - [ - PromEx.Plugins.Application, - PromEx.Plugins.Beam, - # Optional :pool / :poll_rate opts enable periodic pool memory snapshots. - {CcxtOcx.PromEx.Plugin, pool: MyApp.CcxtPool, poll_rate: 10_000} - ] - end - end + ### Event-based (always on) - PromEx is declared as an optional dependency of `:ccxt_ocx`, so this module - is only loaded in consumer projects that also have `:prom_ex` in their deps. + Backed by `[:ccxt_ocx, :runtime, :memory]` — emitted from + `CcxtOcx.Runtime.memory/1`, runtime init/terminate, and + `CcxtOcx.RuntimePool.memory/1`: - ## Provided metrics + * `ccxt_ocx_runtime_memory_malloc_size_bytes` (last_value) + * `ccxt_ocx_runtime_memory_used_size_bytes` (last_value) + * `ccxt_ocx_runtime_memory_obj_count` (last_value) - ### Event-based (always on) + Backed by `[:ccxt_ocx, :rest, :stop | :exception]` — reserved for + Phase 2 `defunified` (Task 7); the metric definitions exist now so + dashboards stay stable when emission lights up: - Backed by `[:ccxt_ocx, :runtime, :memory]` — emitted from - `CcxtOcx.Runtime.memory/1`, runtime init/terminate, and - `CcxtOcx.RuntimePool.memory/1`: + * `ccxt_ocx_rest_duration_milliseconds` (distribution) + * `ccxt_ocx_rest_total` (counter) + * `ccxt_ocx_rest_exceptions_total` (counter) - * `ccxt_ocx_runtime_memory_malloc_size_bytes` (last_value) - * `ccxt_ocx_runtime_memory_used_size_bytes` (last_value) - * `ccxt_ocx_runtime_memory_obj_count` (last_value) + Backed by `[:ccxt_ocx, :ws, :tick]` — reserved for Phase 3 + `defstreaming` (Task 11): - Backed by `[:ccxt_ocx, :rest, :stop | :exception]` — reserved for - Phase 2 `defunified` (Task 7); the metric definitions exist now so - dashboards stay stable when emission lights up: + * `ccxt_ocx_ws_ticks_total` (counter) - * `ccxt_ocx_rest_duration_milliseconds` (distribution) - * `ccxt_ocx_rest_total` (counter) - * `ccxt_ocx_rest_exceptions_total` (counter) + ### Polling (opt-in) - Backed by `[:ccxt_ocx, :ws, :tick]` — reserved for Phase 3 - `defstreaming` (Task 11): + When the plugin is configured with `pool: `, a periodic + timer calls `CcxtOcx.RuntimePool.memory/1` every `:poll_rate` + milliseconds (default 5_000). Each call emits a fresh + `[:ccxt_ocx, :runtime, :memory]` event captured by the event metrics + above. No additional metric definitions — polling here is purely a + driver, not a separate metric stream. - * `ccxt_ocx_ws_ticks_total` (counter) + ## Tag stability - ### Polling (opt-in) + All tag values are normalized through `tag_values/1` callbacks so that + Prometheus labels stay consistent across emission sites. Missing + metadata keys become `"none"`, runtime PIDs are inspected (`#PID<…>`), + and exceptions are normalized to `:kind` (`:error | :exit | :throw`). + """ - When the plugin is configured with `pool: `, a periodic - timer calls `CcxtOcx.RuntimePool.memory/1` every `:poll_rate` - milliseconds (default 5_000). Each call emits a fresh - `[:ccxt_ocx, :runtime, :memory]` event captured by the event metrics - above. No additional metric definitions — polling here is purely a - driver, not a separate metric stream. + use PromEx.Plugin - ## Tag stability + alias CcxtOcx.Telemetry, as: T - All tag values are normalized through `tag_values/1` callbacks so that - Prometheus labels stay consistent across emission sites. Missing - metadata keys become `"none"`, runtime PIDs are inspected (`#PID<…>`), - and exceptions are normalized to `:kind` (`:error | :exit | :throw`). - """ + @impl true + def event_metrics(_opts) do + [ + runtime_event_metrics(), + rest_event_metrics(), + ws_event_metrics() + ] + end - use PromEx.Plugin + @impl true + def polling_metrics(opts) do + case Keyword.fetch(opts, :pool) do + {:ok, pool} -> + poll_rate = Keyword.get(opts, :poll_rate, 5_000) - alias CcxtOcx.Telemetry, as: T + [ + Polling.build( + :ccxt_ocx_runtime_pool_memory_polling_metrics, + poll_rate, + {CcxtOcx.RuntimePool, :memory, [pool]}, + [] + ) + ] - @impl true - def event_metrics(_opts) do - [ - runtime_event_metrics(), - rest_event_metrics(), - ws_event_metrics() - ] - end + :error -> + [] + end + end - @impl true - def polling_metrics(opts) do - case Keyword.fetch(opts, :pool) do - {:ok, pool} -> - poll_rate = Keyword.get(opts, :poll_rate, 5_000) - - [ - Polling.build( - :ccxt_ocx_runtime_pool_memory_polling_metrics, - poll_rate, - {CcxtOcx.RuntimePool, :memory, [pool]}, - [] - ) - ] - - :error -> - [] + ## ------------------------------------------------------------------ + ## Event-group builders + ## ------------------------------------------------------------------ + + @spec runtime_event_metrics() :: Event.t() + defp runtime_event_metrics do + memory_event = T.__runtime_memory__() + memory_tags = [:server, :pool, :phase] + memory_tag_values = &normalize_memory_metadata/1 + + Event.build(:ccxt_ocx_runtime_event_metrics, [ + last_value( + [:ccxt_ocx, :runtime, :memory, :malloc_size, :bytes], + event_name: memory_event, + measurement: :malloc_size, + description: "QuickJS malloc size for a ccxt_ocx runtime.", + tags: memory_tags, + tag_values: memory_tag_values, + unit: :byte + ), + last_value( + [:ccxt_ocx, :runtime, :memory, :used_size, :bytes], + event_name: memory_event, + measurement: :memory_used_size, + description: "QuickJS used memory for a ccxt_ocx runtime.", + tags: memory_tags, + tag_values: memory_tag_values, + unit: :byte + ), + last_value( + [:ccxt_ocx, :runtime, :memory, :obj_count], + event_name: memory_event, + measurement: :obj_count, + description: "QuickJS live object count for a ccxt_ocx runtime.", + tags: memory_tags, + tag_values: memory_tag_values + ) + ]) end - end - ## ------------------------------------------------------------------ - ## Event-group builders - ## ------------------------------------------------------------------ - - @spec runtime_event_metrics() :: Event.t() - defp runtime_event_metrics do - memory_event = T.__runtime_memory__() - memory_tags = [:server, :pool, :phase] - memory_tag_values = &normalize_memory_metadata/1 - - Event.build(:ccxt_ocx_runtime_event_metrics, [ - last_value( - [:ccxt_ocx, :runtime, :memory, :malloc_size, :bytes], - event_name: memory_event, - measurement: :malloc_size, - description: "QuickJS malloc size for a ccxt_ocx runtime.", - tags: memory_tags, - tag_values: memory_tag_values, - unit: :byte - ), - last_value( - [:ccxt_ocx, :runtime, :memory, :used, :bytes], - event_name: memory_event, - measurement: :memory_used_size, - description: "QuickJS used memory for a ccxt_ocx runtime.", - tags: memory_tags, - tag_values: memory_tag_values, - unit: :byte - ), - last_value( - [:ccxt_ocx, :runtime, :memory, :obj_count], - event_name: memory_event, - measurement: :obj_count, - description: "QuickJS live object count for a ccxt_ocx runtime.", - tags: memory_tags, - tag_values: memory_tag_values - ) - ]) - end + @spec rest_event_metrics() :: Event.t() + defp rest_event_metrics do + stop_event = T.__rest_stop__() + exception_event = T.__rest_exception__() + rest_tags = [:exchange, :method] + rest_tag_values = &normalize_rest_metadata/1 + exception_tags = [:exchange, :method, :kind] + exception_tag_values = &normalize_rest_exception_metadata/1 + + Event.build(:ccxt_ocx_rest_event_metrics, [ + distribution( + [:ccxt_ocx, :rest, :duration, :milliseconds], + event_name: stop_event, + measurement: :duration, + description: "CCXT REST call duration.", + tags: rest_tags, + tag_values: rest_tag_values, + unit: {:native, :millisecond}, + reporter_options: [buckets: [10, 50, 100, 250, 500, 1000, 5000]] + ), + counter( + [:ccxt_ocx, :rest, :total], + event_name: stop_event, + description: "CCXT REST call count.", + tags: rest_tags, + tag_values: rest_tag_values + ), + counter( + [:ccxt_ocx, :rest, :exceptions, :total], + event_name: exception_event, + description: "CCXT REST call exception count.", + tags: exception_tags, + tag_values: exception_tag_values + ) + ]) + end - @spec rest_event_metrics() :: Event.t() - defp rest_event_metrics do - stop_event = T.__rest_stop__() - exception_event = T.__rest_exception__() - rest_tags = [:exchange, :method] - rest_tag_values = &normalize_rest_metadata/1 - exception_tags = [:exchange, :method, :kind] - exception_tag_values = &normalize_rest_exception_metadata/1 - - Event.build(:ccxt_ocx_rest_event_metrics, [ - distribution( - [:ccxt_ocx, :rest, :duration, :milliseconds], - event_name: stop_event, - measurement: :duration, - description: "CCXT REST call duration.", - tags: rest_tags, - tag_values: rest_tag_values, - unit: {:native, :millisecond}, - reporter_options: [buckets: [10, 50, 100, 250, 500, 1000, 5000]] - ), - counter( - [:ccxt_ocx, :rest, :total], - event_name: stop_event, - description: "CCXT REST call count.", - tags: rest_tags, - tag_values: rest_tag_values - ), - counter( - [:ccxt_ocx, :rest, :exceptions, :total], - event_name: exception_event, - description: "CCXT REST call exception count.", - tags: exception_tags, - tag_values: exception_tag_values - ) - ]) - end + @spec ws_event_metrics() :: Event.t() + defp ws_event_metrics do + tick_event = T.__ws_tick__() + + Event.build(:ccxt_ocx_ws_event_metrics, [ + counter( + [:ccxt_ocx, :ws, :ticks, :total], + event_name: tick_event, + description: "CCXT WebSocket inbound message count.", + tags: [:exchange, :stream, :type], + tag_values: &normalize_ws_tick_metadata/1 + ) + ]) + end - @spec ws_event_metrics() :: Event.t() - defp ws_event_metrics do - tick_event = T.__ws_tick__() - - Event.build(:ccxt_ocx_ws_event_metrics, [ - counter( - [:ccxt_ocx, :ws, :ticks, :total], - event_name: tick_event, - description: "CCXT WebSocket inbound message count.", - tags: [:exchange, :stream, :type], - tag_values: &normalize_ws_tick_metadata/1 - ) - ]) - end + ## ------------------------------------------------------------------ + ## Tag-value normalizers + ## ------------------------------------------------------------------ + + @spec normalize_memory_metadata(:telemetry.event_metadata()) :: %{ + server: String.t(), + pool: String.t(), + phase: String.t() + } + defp normalize_memory_metadata(metadata) do + %{ + server: stringify(Map.get(metadata, :server)), + pool: stringify(Map.get(metadata, :pool)), + phase: stringify(Map.get(metadata, :phase)) + } + end - ## ------------------------------------------------------------------ - ## Tag-value normalizers - ## ------------------------------------------------------------------ - - @spec normalize_memory_metadata(:telemetry.event_metadata()) :: %{ - server: String.t(), - pool: String.t(), - phase: String.t() - } - defp normalize_memory_metadata(metadata) do - %{ - server: stringify(Map.get(metadata, :server)), - pool: stringify(Map.get(metadata, :pool)), - phase: stringify(Map.get(metadata, :phase)) - } - end + @spec normalize_rest_metadata(:telemetry.event_metadata()) :: %{ + exchange: String.t(), + method: String.t() + } + defp normalize_rest_metadata(metadata) do + %{ + exchange: stringify(Map.get(metadata, :exchange)), + method: stringify(Map.get(metadata, :method)) + } + end - @spec normalize_rest_metadata(:telemetry.event_metadata()) :: %{ - exchange: String.t(), - method: String.t() - } - defp normalize_rest_metadata(metadata) do - %{ - exchange: stringify(Map.get(metadata, :exchange)), - method: stringify(Map.get(metadata, :method)) - } - end + @spec normalize_rest_exception_metadata(:telemetry.event_metadata()) :: %{ + exchange: String.t(), + method: String.t(), + kind: String.t() + } + defp normalize_rest_exception_metadata(metadata) do + %{ + exchange: stringify(Map.get(metadata, :exchange)), + method: stringify(Map.get(metadata, :method)), + kind: stringify(Map.get(metadata, :kind)) + } + end - @spec normalize_rest_exception_metadata(:telemetry.event_metadata()) :: %{ - exchange: String.t(), - method: String.t(), - kind: String.t() - } - defp normalize_rest_exception_metadata(metadata) do - %{ - exchange: stringify(Map.get(metadata, :exchange)), - method: stringify(Map.get(metadata, :method)), - kind: stringify(Map.get(metadata, :kind)) - } - end + @spec normalize_ws_tick_metadata(:telemetry.event_metadata()) :: %{ + exchange: String.t(), + stream: String.t(), + type: String.t() + } + defp normalize_ws_tick_metadata(metadata) do + %{ + exchange: stringify(Map.get(metadata, :exchange)), + stream: stringify(Map.get(metadata, :stream)), + type: stringify(Map.get(metadata, :type)) + } + end - @spec normalize_ws_tick_metadata(:telemetry.event_metadata()) :: %{ - exchange: String.t(), - stream: String.t(), - type: String.t() - } - defp normalize_ws_tick_metadata(metadata) do - %{ - exchange: stringify(Map.get(metadata, :exchange)), - stream: stringify(Map.get(metadata, :stream)), - type: stringify(Map.get(metadata, :type)) - } + @spec stringify(term()) :: String.t() + defp stringify(nil), do: "none" + defp stringify(value) when is_binary(value), do: value + defp stringify(value) when is_atom(value), do: Atom.to_string(value) + defp stringify(value), do: inspect(value) end - - @spec stringify(term()) :: String.t() - defp stringify(nil), do: "none" - defp stringify(value) when is_binary(value), do: value - defp stringify(value) when is_atom(value), do: Atom.to_string(value) - defp stringify(value), do: inspect(value) end diff --git a/mix.exs b/mix.exs index 5cff8ab..88d4826 100644 --- a/mix.exs +++ b/mix.exs @@ -51,8 +51,10 @@ defmodule CcxtOcx.MixProject do {:telemetry, "~> 1.3"}, # Optional PromEx plugin (Task 21 — CcxtOcx.PromEx.Plugin). - # Consumers add prom_ex to their own deps; we only need it present - # at compile time so `use PromEx.Plugin` expands cleanly. + # Consumers add prom_ex to their own deps. The plugin module + # itself is wrapped in `Code.ensure_loaded?(PromEx.Plugin)` so + # downstream projects that don't depend on prom_ex still compile + # ccxt_ocx cleanly — the module simply isn't defined for them. {:prom_ex, "~> 1.11", optional: true}, # JSON