Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .iex.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# IEx auto-attach for ccxt_ocx maintainer dogfooding.
#
# Guarded so it only fires in an active Mix :dev shell — `mix test`, plain
# `iex`, and any prod release path stay untouched. Also guarded on
# `Code.ensure_loaded?/1` so a stripped release (where CcxtOcx.DevTelemetry
# might be filtered out) doesn't blow up the shell.

mix_dev? =
Code.ensure_loaded?(Mix) and function_exported?(Mix, :env, 0) and
try do
Mix.env() == :dev
rescue
ArgumentError -> false
end

if mix_dev? and Code.ensure_loaded?(CcxtOcx.DevTelemetry) do
CcxtOcx.DevTelemetry.watch()

IO.puts("""
📡 CcxtOcx.DevTelemetry attached.
Call `CcxtOcx.DevTelemetry.summary/0` anytime to see counts + last values.
`reset/0` clears, `detach/0` stops counting. See docs/tidewave_examples.md.
""")
end
1 change: 1 addition & 0 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
| Task 18 | ⬜ | 🎁 **dx** · ex_doc + llms.txt [D:2/B:5/U:5 → Eff:2.5] 🎯 |
| Task 19 | ⬜ | 🎁 **dx** · Descripex annotations on the public API [D:3/B:5/U:4 → Eff:1.5] 🚀 |
| Task 20 | 🔄 | 🎁 **dx** · Tidewave examples [D:2/B:4/U:5 → Eff:2.25] 🎯 |
| Task 22 | ✅ | 🎁 **dx** · Dev telemetry dogfooding loop (CcxtOcx.DevTelemetry) [D:2/B:5/U:8 → Eff:3.25] 🎯 |
<!-- TASKS:END -->

---
Expand Down
56 changes: 56 additions & 0 deletions docs/tidewave_examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,62 @@ CcxtOcx.Runtime.memory_usage(:rt) # same map, no telemetry side-effect
end, %{})
```

## Dev Telemetry Loop (Dogfooding)

`CcxtOcx.DevTelemetry` is the maintainer-side companion to `CcxtOcx.Telemetry`: one
call attaches a handler to every `[:ccxt_ocx, ...]` event family, pretty-prints each
emission to stdout, and keeps running counts + last-values in an `Agent` you can
`summary/0` at any time. Use it as the validation loop for new emission sites
(Task 7 `defunified`, Task 11 `defstreaming`, Task 15 memory monitor) before
downstream consumers (`CcxtOcx.PromEx.Plugin`, Grafana dashboards) bake in
assumptions about the contract.

```elixir
# Attach once per session — idempotent (re-attach detaches the prior handler
# and resets counts).
CcxtOcx.DevTelemetry.watch()

# Drive load. Every emission lands as a one-line print + an Agent bump.
{:ok, _} = CcxtOcx.Runtime.start_link(name: :rt)
CcxtOcx.Runtime.memory(:rt)
# => [memory] malloc=12.3M used=8.1M objs=14523 server=#PID<0.345.0> phase=manual

# Snapshot when you want to assert the shape.
CcxtOcx.DevTelemetry.summary()
# => %{
# runtime_memory: %{count: 2, last_measurements: %{...}, last_metadata: %{...}},
# rest_start: %{count: 0, last_measurements: nil, last_metadata: nil},
# rest_stop: %{count: 0, last_measurements: nil, last_metadata: nil},
# rest_exception: %{count: 0, last_measurements: nil, last_metadata: nil},
# ws_tick: %{count: 0, last_measurements: nil, last_metadata: nil}
# }

# Clear counters without re-attaching.
CcxtOcx.DevTelemetry.reset()

# Detach when you're done.
CcxtOcx.DevTelemetry.detach()
```

`watch/1` takes a few options for ad-hoc filtering:

```elixir
# Silence pretty-print, keep counters only (handy when driving high-frequency
# WS load and the prints would scroll past too fast to read).
CcxtOcx.DevTelemetry.watch(print: false)

# Restrict to a subset of event families.
CcxtOcx.DevTelemetry.watch(filter: [:runtime_memory, :ws_tick])

# Send pretty-print to a different IO device (e.g. a log file).
{:ok, log} = File.open("/tmp/ccxt_ocx.log", [:write])
CcxtOcx.DevTelemetry.watch(io_device: log)
```

In dev, `iex -S mix` (and therefore `iex -S mix tidewave`) auto-attaches
`DevTelemetry` via the project's `.iex.exs`, so you can drop straight into
`summary/0` without a manual `watch/0` call.

## Using Other Tidewave Tools Together

While you have a runtime running, combine it with Tidewave's other MCP tools:
Expand Down
257 changes: 257 additions & 0 deletions lib/ccxt_ocx/dev_telemetry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
defmodule CcxtOcx.DevTelemetry do
@moduledoc """
Maintainer-side dogfooding helpers for ccxt_ocx telemetry.

Attach to every `[:ccxt_ocx, ...]` event family in one call, pretty-print
each emission to a configurable IO device, keep running counts plus the
last-seen measurements/metadata in an `Agent`, dump the summary on demand.

For Tidewave / IEx sessions only — this module owns a named process and
prints to stdout by default, so it has no business in `start/0` /
`Application.start/2`. There is no supervisor entry; `watch/1` starts the
Agent lazily.

## Usage

iex> CcxtOcx.DevTelemetry.watch()
:ok
iex> CcxtOcx.Runtime.memory(:rt)
[memory] malloc=12.3M used=8.1M objs=14523 server=#PID<0.345.0> phase=manual
iex> CcxtOcx.DevTelemetry.summary()
%{
runtime_memory: %{count: 1, last_measurements: %{...}, last_metadata: %{...}},
rest_start: %{count: 0, last_measurements: nil, last_metadata: nil},
rest_stop: %{count: 0, last_measurements: nil, last_metadata: nil},
rest_exception: %{count: 0, last_measurements: nil, last_metadata: nil},
ws_tick: %{count: 0, last_measurements: nil, last_metadata: nil}
}

See `docs/tidewave_examples.md` § "Dev Telemetry Loop" for the
observe-while-driving cycle this module is designed for.

## Options for `watch/1`

* `:print` — print each emission to `:io_device`. Default: `true`.
* `:filter` — restrict to a subset of event keys (see `t:event_key/0`).
Default: all events.
* `:io_device` — where to write pretty-print output. Default: `:stdio`.

Idempotent — re-calling `watch/1` detaches the previous handler and
resets state before re-attaching with the new options.
"""

@handler_id "ccxt-ocx-dev-telemetry"

@events %{
runtime_memory: [:ccxt_ocx, :runtime, :memory],
rest_start: [:ccxt_ocx, :rest, :start],
rest_stop: [:ccxt_ocx, :rest, :stop],
rest_exception: [:ccxt_ocx, :rest, :exception],
ws_tick: [:ccxt_ocx, :ws, :tick]
}

@watch_schema NimbleOptions.new!(
print: [
type: :boolean,
default: true
],
filter: [
type: {:list, {:in, Map.keys(@events)}},
default: Map.keys(@events)
],
io_device: [
type: :any,
default: :stdio
]
)

@type event_key ::
:runtime_memory | :rest_start | :rest_stop | :rest_exception | :ws_tick

@type entry :: %{
count: non_neg_integer(),
last_measurements: map() | nil,
last_metadata: map() | nil
}

@type state :: %{event_key() => entry()}

@type opts :: [
print: boolean(),
filter: [event_key()],
io_device: IO.device()
]

@doc """
Attach a single handler to every `[:ccxt_ocx, ...]` event family.

Idempotent — re-calling detaches the previous handler and resets the
in-memory state before re-attaching.
"""
@spec watch(opts()) :: :ok
def watch(opts \\ []) do
opts = NimbleOptions.validate!(opts, @watch_schema)
print? = Keyword.fetch!(opts, :print)
io_device = Keyword.fetch!(opts, :io_device)
filter = Keyword.fetch!(opts, :filter)
events = Enum.map(filter, &Map.fetch!(@events, &1))

:ok = detach()
:ok = ensure_agent()
:ok = Agent.update(__MODULE__, fn _ -> initial_state() end)

:ok =
:telemetry.attach_many(
@handler_id,
events,
&__MODULE__.handle_event/4,
%{print?: print?, io_device: io_device}
)

:ok
end
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@doc """
Detach the dev-telemetry handler. Idempotent.

Leaves the Agent (and its accumulated counts) alone — call `reset/0` to
clear, or `summary/0` to inspect post-detach.
"""
@spec detach() :: :ok
def detach do
_ = :telemetry.detach(@handler_id)
:ok
end

@doc """
Current in-memory state.

Returns the initial all-zeros state if `watch/1` was never called.
"""
@spec summary() :: state()
def summary do
case Process.whereis(__MODULE__) do
nil -> initial_state()
_pid -> Agent.get(__MODULE__, & &1)
end
end

@doc """
Reset all counts and last-values to the initial state.

No-op if `watch/1` was never called.
"""
@spec reset() :: :ok
def reset do
case Process.whereis(__MODULE__) do
nil -> :ok
_pid -> Agent.update(__MODULE__, fn _ -> initial_state() end)
end
end

@doc false
@spec handle_event([atom()], map(), map(), map()) :: :ok
def handle_event(event, measurements, metadata, config) do
key = event_to_key(event)

if Process.whereis(__MODULE__) do
Agent.update(__MODULE__, fn state ->
entry = Map.fetch!(state, key)

new_entry = %{
entry
| count: entry.count + 1,
last_measurements: measurements,
last_metadata: metadata
}

Map.put(state, key, new_entry)
end)
end

if Map.get(config, :print?, false) do
io_device = Map.get(config, :io_device, :stdio)
IO.puts(io_device, format(key, measurements, metadata))
end

:ok
end

# ------------------------------------------------------------------
# private
# ------------------------------------------------------------------

@spec ensure_agent() :: :ok
defp ensure_agent do
case Agent.start(fn -> initial_state() end, name: __MODULE__) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
end
end

@spec initial_state() :: state()
defp initial_state do
@events
|> Map.keys()
|> Map.new(fn key -> {key, %{count: 0, last_measurements: nil, last_metadata: nil}} end)
end

@spec event_to_key([atom()]) :: event_key()
defp event_to_key([:ccxt_ocx, :runtime, :memory]), do: :runtime_memory
defp event_to_key([:ccxt_ocx, :rest, :start]), do: :rest_start
defp event_to_key([:ccxt_ocx, :rest, :stop]), do: :rest_stop
defp event_to_key([:ccxt_ocx, :rest, :exception]), do: :rest_exception
defp event_to_key([:ccxt_ocx, :ws, :tick]), do: :ws_tick

@spec format(event_key(), map(), map()) :: String.t()
defp format(:runtime_memory, meas, meta) do
malloc = format_bytes(Map.get(meas, :malloc_size))
used = format_bytes(Map.get(meas, :memory_used_size))
objs = Map.get(meas, :obj_count, "?")
phase = Map.get(meta, :phase, :manual)

target =
cond do
Map.has_key?(meta, :server) -> "server=#{inspect(meta.server)}"
Map.has_key?(meta, :pool) -> "pool=#{inspect(meta.pool)}"
true -> "target=?"
end

"[memory] malloc=#{malloc} used=#{used} objs=#{objs} #{target} phase=#{phase}"
end

defp format(:rest_start, _meas, meta) do
"[rest:start] exchange=#{Map.get(meta, :exchange, "?")} method=#{Map.get(meta, :method, "?")}"
end

defp format(:rest_stop, meas, meta) do
duration = format_duration(Map.get(meas, :duration))

"[rest:stop] exchange=#{Map.get(meta, :exchange, "?")} method=#{Map.get(meta, :method, "?")} duration=#{duration}"
end

defp format(:rest_exception, _meas, meta) do
"[rest:exception] exchange=#{Map.get(meta, :exchange, "?")} method=#{Map.get(meta, :method, "?")} kind=#{Map.get(meta, :kind, "?")}"
end

defp format(:ws_tick, _meas, meta) do
"[ws:tick] exchange=#{Map.get(meta, :exchange, "?")} stream=#{Map.get(meta, :stream, "?")} type=#{Map.get(meta, :type, "?")}"
end

@spec format_bytes(integer() | nil) :: String.t()
defp format_bytes(nil), do: "?"

defp format_bytes(n) when is_integer(n) and n >= 1_000_000, do: "#{Float.round(n / 1_000_000, 1)}M"

defp format_bytes(n) when is_integer(n) and n >= 1_000, do: "#{Float.round(n / 1_000, 1)}K"

defp format_bytes(n) when is_integer(n), do: "#{n}B"

@spec format_duration(integer() | nil) :: String.t()
defp format_duration(nil), do: "?"

defp format_duration(n) when is_integer(n) do
us = System.convert_time_unit(n, :native, :microsecond)
"#{Float.round(us / 1000, 2)}ms"
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ defmodule CcxtOcx.MixProject do

# Observability (Task 14 — telemetry events + future memory monitor)
{:telemetry, "~> 1.3"},
{:nimble_options, "~> 1.1"},

# Optional PromEx plugin (Task 21 — CcxtOcx.PromEx.Plugin).
# Consumers add prom_ex to their own deps. The plugin module
Expand Down
Loading
Loading