From 46b7b9120beb648b78629fa75bab02a516e1c189 Mon Sep 17 00:00:00 2001 From: Daniel McAuley Date: Mon, 20 Apr 2026 21:33:35 -0700 Subject: [PATCH] Persist per-issue token usage --- SPEC.md | 32 ++- elixir/README.md | 12 + elixir/docs/token_accounting.md | 13 + elixir/lib/symphony_elixir/orchestrator.ex | 49 +++- .../lib/symphony_elixir/token_usage_ledger.ex | 232 +++++++++++++++++ elixir/lib/symphony_elixir_web/presenter.ex | 46 +++- elixir/test/support/test_support.exs | 3 + .../test/symphony_elixir/extensions_test.exs | 76 +++++- .../orchestrator_status_test.exs | 67 ++++- .../token_usage_ledger_test.exs | 246 ++++++++++++++++++ 10 files changed, 756 insertions(+), 20 deletions(-) create mode 100644 elixir/lib/symphony_elixir/token_usage_ledger.ex create mode 100644 elixir/test/symphony_elixir/token_usage_ledger_test.exs diff --git a/SPEC.md b/SPEC.md index f9e2b63a14..09999b1c90 100644 --- a/SPEC.md +++ b/SPEC.md @@ -1306,6 +1306,12 @@ should return: - `output_tokens` - `total_tokens` - `seconds_running` (aggregate runtime seconds as of snapshot time, including active sessions) +- `token_usage` (optional durable token summary across completed and active sessions) + - `input_tokens` + - `output_tokens` + - `total_tokens` + - `issue_count` + - `session_count` - `rate_limits` (latest coding-agent rate limit payload, if available) Recommended snapshot error modes: @@ -1336,6 +1342,10 @@ Token accounting rules: - Do not treat generic `usage` maps as cumulative totals unless the event type defines them that way. - Accumulate aggregate totals in orchestrator state. +- Implementations may also persist append-only token observations for durable per-issue + observability. If they do, summarize by taking the high-water cumulative totals per + `(issue_identifier, session_id)` and then summing those session totals; do not sum every observed + event. Runtime accounting: @@ -1445,6 +1455,13 @@ Minimum endpoints: "total_tokens": 7400, "seconds_running": 1834.2 }, + "token_usage": { + "input_tokens": 5000, + "output_tokens": 2400, + "total_tokens": 7400, + "issue_count": 2, + "session_count": 3 + }, "rate_limits": null } ``` @@ -1498,12 +1515,21 @@ Minimum endpoints: } ], "last_error": null, - "tracked": {} + "tracked": {}, + "token_usage": { + "input_tokens": 1200, + "output_tokens": 800, + "total_tokens": 2000, + "session_count": 1 + } } ``` - - If the issue is unknown to the current in-memory state, return `404` with an error response (for - example `{\"error\":{\"code\":\"issue_not_found\",\"message\":\"...\"}}`). + - If the issue is unknown to the current in-memory state but exists in a durable token ledger, an + implementation may return an inactive issue payload with token usage. + - If the issue is unknown to both the current in-memory state and durable observability state, + return `404` with an error response (for example + `{\"error\":{\"code\":\"issue_not_found\",\"message\":\"...\"}}`). - `POST /api/v1/refresh` - Queues an immediate tracker poll + reconciliation cycle (best-effort trigger; implementations diff --git a/elixir/README.md b/elixir/README.md index 603b4bb000..4fa97626f3 100644 --- a/elixir/README.md +++ b/elixir/README.md @@ -80,6 +80,12 @@ Optional flags: - `--logs-root` tells Symphony to write logs under a different directory (default: `./log`) - `--port` also starts the Phoenix observability service (default: disabled) +Symphony also writes durable Codex token usage observations to `token_usage.jsonl` next to the +configured log file. With the default log path, this is `./log/token_usage.jsonl`; with +`--logs-root`, it follows the same log root. The ledger stores cumulative high-water token totals +per issue/session so completed tickets can still be inspected after the in-memory dashboard state +has moved on. + The `WORKFLOW.md` file uses YAML front matter for configuration, plus a Markdown body used as the Codex session prompt. @@ -160,6 +166,12 @@ The observability UI now runs on a minimal Phoenix stack: - Bandit as the HTTP server - Phoenix dependency static assets for the LiveView client bootstrap +The JSON API includes durable token summaries from `token_usage.jsonl`: + +- `/api/v1/state` includes `token_usage` totals plus issue/session counts. +- `/api/v1/` can return `status: "inactive"` with `token_usage` for a completed + or otherwise inactive issue that is no longer present in the live running/retry state. + ## Project Layout - `lib/`: application code and Mix tasks diff --git a/elixir/docs/token_accounting.md b/elixir/docs/token_accounting.md index 2c6e107be2..dbcb6e1d77 100644 --- a/elixir/docs/token_accounting.md +++ b/elixir/docs/token_accounting.md @@ -285,11 +285,23 @@ That is a strong signal for Symphony: - use absolute totals as the main accounting surface - ignore last/delta values for totals +## Durable Per-Issue Ledger + +The Elixir reference implementation persists token observations to `token_usage.jsonl` next to the +configured log file. Each line is an append-only JSON object containing the issue identifier, Codex +session id, source event, final/non-final marker, and cumulative input/output/total token values. + +The ledger is summarized by taking the maximum observed totals per `(issue_identifier, session_id)` +and then summing those session high-water marks. This makes repeated live updates, retries, and +final snapshots safe to append without double-counting. The ledger remains observability data only: +it is not a billing surface and does not apply pricing or model-specific cost rules. + ## Recommended Symphony Documentation Contract If Symphony documents token reporting externally, the contract should be: - Live token totals come from Codex thread-scoped cumulative usage. +- Durable per-issue totals come from high-water cumulative totals per Codex session. - Incremental usage may also be emitted, but Symphony does not use it for totals. - Turn-completed usage is event-specific and should not be assumed to be a fresh additive increment. - Reporting is thread-based, and multiple turns can occur on one thread. @@ -300,5 +312,6 @@ If Symphony documents token reporting externally, the contract should be: - Fallback to `info.total_token_usage` - Ignore `last` for totals - Key totals by `thread_id` +- Persist high-water totals by `(issue_identifier, session_id)` - Do not classify generic `usage` by field name alone - Do not double-count turn-completed usage after live updates diff --git a/elixir/lib/symphony_elixir/orchestrator.ex b/elixir/lib/symphony_elixir/orchestrator.ex index 3cd814829b..5f070edfc5 100644 --- a/elixir/lib/symphony_elixir/orchestrator.ex +++ b/elixir/lib/symphony_elixir/orchestrator.ex @@ -7,7 +7,7 @@ defmodule SymphonyElixir.Orchestrator do require Logger import Bitwise, only: [<<<: 2] - alias SymphonyElixir.{AgentRunner, Config, StatusDashboard, Tracker, Workspace} + alias SymphonyElixir.{AgentRunner, Config, StatusDashboard, TokenUsageLedger, Tracker, Workspace} alias SymphonyElixir.Linear.Issue @continuation_retry_delay_ms 1_000 @@ -190,6 +190,7 @@ defmodule SymphonyElixir.Orchestrator do running_entry -> {updated_running_entry, token_delta} = integrate_codex_update(running_entry, update) + :ok = append_token_usage_observation(issue_id, updated_running_entry, update, false) state = state @@ -1275,6 +1276,7 @@ defmodule SymphonyElixir.Orchestrator do end defp record_session_completion_totals(state, running_entry) when is_map(running_entry) do + :ok = append_token_usage_observation(running_entry_issue_id(running_entry), running_entry, nil, true) runtime_seconds = running_seconds(running_entry.started_at, DateTime.utc_now()) codex_totals = @@ -1293,6 +1295,51 @@ defmodule SymphonyElixir.Orchestrator do defp record_session_completion_totals(state, _running_entry), do: state + defp append_token_usage_observation(issue_id, running_entry, update, final?) when is_map(running_entry) do + if token_usage_observation?(running_entry) do + TokenUsageLedger.append_observation(%{ + observed_at: DateTime.utc_now(), + final: final?, + issue_id: issue_id, + issue_identifier: Map.get(running_entry, :identifier), + session_id: Map.get(running_entry, :session_id), + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path), + turn_count: Map.get(running_entry, :turn_count, 0), + input_tokens: Map.get(running_entry, :codex_input_tokens, 0), + output_tokens: Map.get(running_entry, :codex_output_tokens, 0), + total_tokens: Map.get(running_entry, :codex_total_tokens, 0), + source_event: token_usage_source_event(update, final?) + }) + end + + :ok + end + + defp token_usage_observation?(running_entry) do + is_binary(Map.get(running_entry, :session_id)) and + Enum.any?( + [ + Map.get(running_entry, :codex_input_tokens, 0), + Map.get(running_entry, :codex_output_tokens, 0), + Map.get(running_entry, :codex_total_tokens, 0) + ], + &(&1 > 0) + ) + end + + defp token_usage_source_event(_update, true), do: :session_final + + defp token_usage_source_event(%{event: event}, _final?), do: event + + defp token_usage_source_event(_update, _final?), do: nil + + defp running_entry_issue_id(%{issue: %Issue{id: issue_id}}) when is_binary(issue_id), do: issue_id + + defp running_entry_issue_id(%{issue_id: issue_id}) when is_binary(issue_id), do: issue_id + + defp running_entry_issue_id(_running_entry), do: nil + defp refresh_runtime_config(%State{} = state) do config = Config.settings!() diff --git a/elixir/lib/symphony_elixir/token_usage_ledger.ex b/elixir/lib/symphony_elixir/token_usage_ledger.ex new file mode 100644 index 0000000000..95d05acc10 --- /dev/null +++ b/elixir/lib/symphony_elixir/token_usage_ledger.ex @@ -0,0 +1,232 @@ +defmodule SymphonyElixir.TokenUsageLedger do + @moduledoc """ + Durable JSONL ledger for per-issue Codex token usage observations. + """ + + require Logger + + alias SymphonyElixir.LogFile + + @schema_version 1 + @ledger_filename "token_usage.jsonl" + + @type token_summary :: %{ + input_tokens: non_neg_integer(), + output_tokens: non_neg_integer(), + total_tokens: non_neg_integer(), + issue_count: non_neg_integer(), + session_count: non_neg_integer() + } + + @type issue_summary :: %{ + issue_id: String.t() | nil, + issue_identifier: String.t(), + worker_host: String.t() | nil, + workspace_path: String.t() | nil, + input_tokens: non_neg_integer(), + output_tokens: non_neg_integer(), + total_tokens: non_neg_integer(), + session_count: non_neg_integer() + } + + @spec file_path() :: Path.t() + def file_path do + case Application.get_env(:symphony_elixir, :token_usage_ledger_file) do + path when is_binary(path) and path != "" -> + path + + _ -> + default_file_path() + end + end + + @spec append_observation(map(), keyword()) :: :ok + def append_observation(attrs, opts \\ []) when is_map(attrs) do + file = Keyword.get(opts, :file, file_path()) + record = observation_record(attrs) + line = Jason.encode!(record) <> "\n" + + with :ok <- File.mkdir_p(Path.dirname(file)), + :ok <- File.write(file, line, [:append]) do + :ok + else + {:error, reason} -> + Logger.warning("Failed to append token usage ledger record file=#{file} reason=#{inspect(reason)}") + :ok + end + rescue + error -> + Logger.warning("Failed to append token usage ledger record reason=#{Exception.message(error)}") + :ok + end + + @spec summary(keyword()) :: token_summary() + def summary(opts \\ []) do + high_water = opts |> read_records() |> high_water_records() + + %{ + input_tokens: sum_token(high_water, :input_tokens), + output_tokens: sum_token(high_water, :output_tokens), + total_tokens: sum_token(high_water, :total_tokens), + issue_count: high_water |> Enum.map(& &1.issue_identifier) |> Enum.uniq() |> length(), + session_count: length(high_water) + } + end + + @spec issue_summary(String.t(), keyword()) :: issue_summary() | nil + def issue_summary(issue_identifier, opts \\ []) when is_binary(issue_identifier) do + records = + opts + |> read_records() + |> Enum.filter(&(&1.issue_identifier == issue_identifier)) + + case high_water_records(records) do + [] -> + nil + + high_water -> + latest = Enum.max_by(records, & &1.observed_at, fn -> nil end) + + %{ + issue_id: latest && latest.issue_id, + issue_identifier: issue_identifier, + worker_host: latest && latest.worker_host, + workspace_path: latest && latest.workspace_path, + input_tokens: sum_token(high_water, :input_tokens), + output_tokens: sum_token(high_water, :output_tokens), + total_tokens: sum_token(high_water, :total_tokens), + session_count: length(high_water) + } + end + end + + @spec read_records(keyword()) :: [map()] + def read_records(opts \\ []) do + file = Keyword.get(opts, :file, file_path()) + + case File.read(file) do + {:ok, contents} -> + contents + |> String.split("\n", trim: true) + |> Enum.flat_map(&decode_record/1) + + {:error, _reason} -> + [] + end + end + + defp default_file_path do + log_file = Application.get_env(:symphony_elixir, :log_file, LogFile.default_log_file()) + Path.join(Path.dirname(log_file), @ledger_filename) + end + + defp observation_record(attrs) do + %{ + "schema_version" => @schema_version, + "observed_at" => observed_at(attrs), + "final" => boolean_value(value(attrs, :final), false), + "issue_id" => string_value(value(attrs, :issue_id)), + "issue_identifier" => string_value(value(attrs, :issue_identifier)), + "session_id" => string_value(value(attrs, :session_id)), + "worker_host" => string_value(value(attrs, :worker_host)), + "workspace_path" => string_value(value(attrs, :workspace_path)), + "turn_count" => integer_value(value(attrs, :turn_count), 0), + "input_tokens" => integer_value(value(attrs, :input_tokens), 0), + "output_tokens" => integer_value(value(attrs, :output_tokens), 0), + "total_tokens" => integer_value(value(attrs, :total_tokens), 0), + "source_event" => string_value(value(attrs, :source_event)) + } + end + + defp observed_at(attrs) do + case value(attrs, :observed_at) do + %DateTime{} = observed_at -> + observed_at |> DateTime.truncate(:second) |> DateTime.to_iso8601() + + observed_at when is_binary(observed_at) and observed_at != "" -> + observed_at + + _ -> + DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601() + end + end + + defp decode_record(line) do + case Jason.decode(line) do + {:ok, %{} = raw} -> + case normalize_record(raw) do + nil -> [] + record -> [record] + end + + _ -> + [] + end + end + + defp normalize_record(raw) do + with @schema_version <- integer_value(value(raw, :schema_version), 0), + issue_identifier when is_binary(issue_identifier) <- string_value(value(raw, :issue_identifier)), + session_id when is_binary(session_id) <- string_value(value(raw, :session_id)) do + %{ + observed_at: string_value(value(raw, :observed_at)) || "", + final: boolean_value(value(raw, :final), false), + issue_id: string_value(value(raw, :issue_id)), + issue_identifier: issue_identifier, + session_id: session_id, + worker_host: string_value(value(raw, :worker_host)), + workspace_path: string_value(value(raw, :workspace_path)), + turn_count: integer_value(value(raw, :turn_count), 0), + input_tokens: integer_value(value(raw, :input_tokens), 0), + output_tokens: integer_value(value(raw, :output_tokens), 0), + total_tokens: integer_value(value(raw, :total_tokens), 0), + source_event: string_value(value(raw, :source_event)) + } + else + _ -> nil + end + end + + defp high_water_records(records) do + records + |> Enum.group_by(&{&1.issue_identifier, &1.session_id}) + |> Enum.map(fn {_key, session_records} -> + Enum.max_by(session_records, & &1.total_tokens) + end) + end + + defp sum_token(records, key) do + Enum.reduce(records, 0, fn record, total -> + total + Map.get(record, key, 0) + end) + end + + defp value(map, key) when is_map(map) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) + end + + defp integer_value(value, _default) when is_integer(value), do: max(value, 0) + + defp integer_value(value, default) when is_binary(value) do + case Integer.parse(value) do + {parsed, ""} -> max(parsed, 0) + _ -> default + end + end + + defp integer_value(_value, default), do: default + + defp boolean_value(value, _default) when is_boolean(value), do: value + defp boolean_value(_value, default), do: default + + defp string_value(nil), do: nil + + defp string_value(value) when is_binary(value) do + trimmed = String.trim(value) + if trimmed == "", do: nil, else: trimmed + end + + defp string_value(value) when is_atom(value), do: Atom.to_string(value) + defp string_value(value) when is_integer(value), do: Integer.to_string(value) + defp string_value(_value), do: nil +end diff --git a/elixir/lib/symphony_elixir_web/presenter.ex b/elixir/lib/symphony_elixir_web/presenter.ex index 1063cf7a64..5c5042fc97 100644 --- a/elixir/lib/symphony_elixir_web/presenter.ex +++ b/elixir/lib/symphony_elixir_web/presenter.ex @@ -3,7 +3,7 @@ defmodule SymphonyElixirWeb.Presenter do Shared projections for the observability API and dashboard. """ - alias SymphonyElixir.{Config, Orchestrator, StatusDashboard} + alias SymphonyElixir.{Config, Orchestrator, StatusDashboard, TokenUsageLedger} @spec state_payload(GenServer.name(), timeout()) :: map() def state_payload(orchestrator, snapshot_timeout_ms) do @@ -20,6 +20,7 @@ defmodule SymphonyElixirWeb.Presenter do running: Enum.map(snapshot.running, &running_entry_payload/1), retrying: Enum.map(snapshot.retrying, &retry_entry_payload/1), codex_totals: snapshot.codex_totals, + token_usage: TokenUsageLedger.summary(), rate_limits: snapshot.rate_limits } @@ -37,11 +38,12 @@ defmodule SymphonyElixirWeb.Presenter do %{} = snapshot -> running = Enum.find(snapshot.running, &(&1.identifier == issue_identifier)) retry = Enum.find(snapshot.retrying, &(&1.identifier == issue_identifier)) + token_usage = TokenUsageLedger.issue_summary(issue_identifier) - if is_nil(running) and is_nil(retry) do + if is_nil(running) and is_nil(retry) and is_nil(token_usage) do {:error, :issue_not_found} else - {:ok, issue_payload_body(issue_identifier, running, retry)} + {:ok, issue_payload_body(issue_identifier, running, retry, token_usage)} end _ -> @@ -60,14 +62,14 @@ defmodule SymphonyElixirWeb.Presenter do end end - defp issue_payload_body(issue_identifier, running, retry) do + defp issue_payload_body(issue_identifier, running, retry, token_usage) do %{ issue_identifier: issue_identifier, - issue_id: issue_id_from_entries(running, retry), - status: issue_status(running, retry), + issue_id: issue_id_from_entries(running, retry, token_usage), + status: issue_status(running, retry, token_usage), workspace: %{ - path: workspace_path(issue_identifier, running, retry), - host: workspace_host(running, retry) + path: workspace_path(issue_identifier, running, retry, token_usage), + host: workspace_host(running, retry, token_usage) }, attempts: %{ restart_count: restart_count(retry), @@ -80,12 +82,13 @@ defmodule SymphonyElixirWeb.Presenter do }, recent_events: (running && recent_events_payload(running)) || [], last_error: retry && retry.error, - tracked: %{} + tracked: %{}, + token_usage: token_usage && token_usage_payload(token_usage) } end - defp issue_id_from_entries(running, retry), - do: (running && running.issue_id) || (retry && retry.issue_id) + defp issue_id_from_entries(running, retry, token_usage), + do: (running && running.issue_id) || (retry && retry.issue_id) || (token_usage && token_usage.issue_id) defp restart_count(retry), do: max(retry_attempt(retry) - 1, 0) defp retry_attempt(nil), do: 0 @@ -95,6 +98,9 @@ defmodule SymphonyElixirWeb.Presenter do defp issue_status(nil, _retry), do: "retrying" defp issue_status(_running, _retry), do: "running" + defp issue_status(nil, nil, %{}), do: "inactive" + defp issue_status(running, retry, _token_usage), do: issue_status(running, retry) + defp running_entry_payload(entry) do %{ issue_id: entry.issue_id, @@ -157,14 +163,26 @@ defmodule SymphonyElixirWeb.Presenter do } end - defp workspace_path(issue_identifier, running, retry) do + defp workspace_path(issue_identifier, running, retry, token_usage) do (running && Map.get(running, :workspace_path)) || (retry && Map.get(retry, :workspace_path)) || + (token_usage && Map.get(token_usage, :workspace_path)) || Path.join(Config.settings!().workspace.root, issue_identifier) end - defp workspace_host(running, retry) do - (running && Map.get(running, :worker_host)) || (retry && Map.get(retry, :worker_host)) + defp workspace_host(running, retry, token_usage) do + (running && Map.get(running, :worker_host)) || + (retry && Map.get(retry, :worker_host)) || + (token_usage && Map.get(token_usage, :worker_host)) + end + + defp token_usage_payload(token_usage) do + %{ + input_tokens: token_usage.input_tokens, + output_tokens: token_usage.output_tokens, + total_tokens: token_usage.total_tokens, + session_count: token_usage.session_count + } end defp recent_events_payload(running) do diff --git a/elixir/test/support/test_support.exs b/elixir/test/support/test_support.exs index 484c1cae72..71a20d7caf 100644 --- a/elixir/test/support/test_support.exs +++ b/elixir/test/support/test_support.exs @@ -33,8 +33,10 @@ defmodule SymphonyElixir.TestSupport do File.mkdir_p!(workflow_root) workflow_file = Path.join(workflow_root, "WORKFLOW.md") + token_usage_ledger_file = Path.join(workflow_root, "token_usage.jsonl") write_workflow_file!(workflow_file) Workflow.set_workflow_file_path(workflow_file) + Application.put_env(:symphony_elixir, :token_usage_ledger_file, token_usage_ledger_file) if Process.whereis(SymphonyElixir.WorkflowStore), do: SymphonyElixir.WorkflowStore.force_reload() stop_default_http_server() @@ -43,6 +45,7 @@ defmodule SymphonyElixir.TestSupport do Application.delete_env(:symphony_elixir, :server_port_override) Application.delete_env(:symphony_elixir, :memory_tracker_issues) Application.delete_env(:symphony_elixir, :memory_tracker_recipient) + Application.delete_env(:symphony_elixir, :token_usage_ledger_file) File.rm_rf(workflow_root) end) diff --git a/elixir/test/symphony_elixir/extensions_test.exs b/elixir/test/symphony_elixir/extensions_test.exs index d6309c9662..b1205cd094 100644 --- a/elixir/test/symphony_elixir/extensions_test.exs +++ b/elixir/test/symphony_elixir/extensions_test.exs @@ -376,6 +376,13 @@ defmodule SymphonyElixir.ExtensionsTest do "total_tokens" => 12, "seconds_running" => 42.5 }, + "token_usage" => %{ + "input_tokens" => 0, + "output_tokens" => 0, + "total_tokens" => 0, + "issue_count" => 0, + "session_count" => 0 + }, "rate_limits" => %{"primary" => %{"remaining" => 11}} } @@ -407,7 +414,8 @@ defmodule SymphonyElixir.ExtensionsTest do "logs" => %{"codex_session_logs" => []}, "recent_events" => [], "last_error" => nil, - "tracked" => %{} + "tracked" => %{}, + "token_usage" => nil } conn = get(build_conn(), "/api/v1/MT-RETRY") @@ -427,6 +435,72 @@ defmodule SymphonyElixir.ExtensionsTest do json_response(conn, 202) end + test "phoenix observability api serves inactive ledger-backed issue details" do + orchestrator_name = Module.concat(__MODULE__, :LedgerBackedIssueOrchestrator) + snapshot = static_snapshot() |> Map.put(:running, []) |> Map.put(:retrying, []) + + {:ok, _pid} = + StaticOrchestrator.start_link( + name: orchestrator_name, + snapshot: snapshot, + refresh: :unavailable + ) + + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + SymphonyElixir.TokenUsageLedger.append_observation( + %{ + observed_at: "2026-04-20T10:00:00Z", + final: true, + issue_id: "issue-done", + issue_identifier: "MT-DONE", + session_id: "thread-done-turn-done", + worker_host: "worker-a", + workspace_path: "/tmp/MT-DONE", + turn_count: 3, + input_tokens: 40, + output_tokens: 6, + total_tokens: 46, + source_event: :session_final + }, + file: ledger_file + ) + + start_test_endpoint(orchestrator: orchestrator_name, snapshot_timeout_ms: 50) + + state_payload = json_response(get(build_conn(), "/api/v1/state"), 200) + + assert state_payload["token_usage"] == %{ + "input_tokens" => 40, + "output_tokens" => 6, + "total_tokens" => 46, + "issue_count" => 1, + "session_count" => 1 + } + + issue_payload = json_response(get(build_conn(), "/api/v1/MT-DONE"), 200) + + assert issue_payload == %{ + "issue_identifier" => "MT-DONE", + "issue_id" => "issue-done", + "status" => "inactive", + "workspace" => %{"path" => "/tmp/MT-DONE", "host" => "worker-a"}, + "attempts" => %{"restart_count" => 0, "current_retry_attempt" => 0}, + "running" => nil, + "retry" => nil, + "logs" => %{"codex_session_logs" => []}, + "recent_events" => [], + "last_error" => nil, + "tracked" => %{}, + "token_usage" => %{ + "input_tokens" => 40, + "output_tokens" => 6, + "total_tokens" => 46, + "session_count" => 1 + } + } + end + test "phoenix observability api preserves 405, 404, and unavailable behavior" do unavailable_orchestrator = Module.concat(__MODULE__, :UnavailableOrchestrator) start_test_endpoint(orchestrator: unavailable_orchestrator, snapshot_timeout_ms: 5) diff --git a/elixir/test/symphony_elixir/orchestrator_status_test.exs b/elixir/test/symphony_elixir/orchestrator_status_test.exs index 4326b80ce3..119b7fb9bf 100644 --- a/elixir/test/symphony_elixir/orchestrator_status_test.exs +++ b/elixir/test/symphony_elixir/orchestrator_status_test.exs @@ -190,6 +190,21 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert snapshot_entry.turn_count == 1 assert is_integer(snapshot_entry.runtime_seconds) + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + assert [ + %{ + final: false, + issue_id: ^issue_id, + issue_identifier: "MT-201", + session_id: "thread-usage-turn-usage", + input_tokens: 12, + output_tokens: 4, + total_tokens: 16, + source_event: "notification" + } + ] = SymphonyElixir.TokenUsageLedger.read_records(file: ledger_file) + send(pid, {:DOWN, process_ref, :process, self(), :normal}) completed_state = :sys.get_state(pid) @@ -197,6 +212,20 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert completed_state.codex_totals.output_tokens == 4 assert completed_state.codex_totals.total_tokens == 16 assert is_integer(completed_state.codex_totals.seconds_running) + + assert [ + %{final: false}, + %{ + final: true, + issue_id: ^issue_id, + issue_identifier: "MT-201", + session_id: "thread-usage-turn-usage", + input_tokens: 12, + output_tokens: 4, + total_tokens: 16, + source_event: "session_final" + } + ] = SymphonyElixir.TokenUsageLedger.read_records(file: ledger_file) end test "orchestrator snapshot tracks turn completed usage when present" do @@ -605,6 +634,16 @@ defmodule SymphonyElixir.OrchestratorStatusTest do |> Map.put(:claimed, MapSet.put(initial_state.claimed, issue_id)) end) + send( + pid, + {:codex_worker_update, issue_id, + %{ + event: :session_started, + session_id: "thread-usage-turn-usage", + timestamp: DateTime.utc_now() + }} + ) + for usage <- [ %{"input_tokens" => 8, "output_tokens" => 3, "total_tokens" => 11}, %{"input_tokens" => 10, "output_tokens" => 4, "total_tokens" => 14} @@ -628,6 +667,16 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert snapshot_entry.codex_input_tokens == 10 assert snapshot_entry.codex_output_tokens == 4 assert snapshot_entry.codex_total_tokens == 14 + + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + assert SymphonyElixir.TokenUsageLedger.summary(file: ledger_file) == %{ + input_tokens: 10, + output_tokens: 4, + total_tokens: 14, + issue_count: 1, + session_count: 1 + } end test "orchestrator token accounting ignores last_token_usage without cumulative totals" do @@ -932,6 +981,9 @@ defmodule SymphonyElixir.OrchestratorStatusTest do last_codex_message: nil, last_codex_timestamp: stale_activity_at, last_codex_event: :notification, + codex_input_tokens: 20, + codex_output_tokens: 3, + codex_total_tokens: 23, started_at: stale_activity_at } @@ -957,8 +1009,21 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert is_integer(due_at_ms) remaining_ms = due_at_ms - System.monotonic_time(:millisecond) - assert remaining_ms >= 9_500 + assert remaining_ms >= 8_000 assert remaining_ms <= 10_500 + + ledger_file = Application.fetch_env!(:symphony_elixir, :token_usage_ledger_file) + + assert [ + %{ + final: true, + issue_id: ^issue_id, + issue_identifier: "MT-STALL", + session_id: "thread-stall-turn-stall", + total_tokens: 23, + source_event: "session_final" + } + ] = SymphonyElixir.TokenUsageLedger.read_records(file: ledger_file) end test "status dashboard renders offline marker to terminal" do diff --git a/elixir/test/symphony_elixir/token_usage_ledger_test.exs b/elixir/test/symphony_elixir/token_usage_ledger_test.exs new file mode 100644 index 0000000000..7c5069262e --- /dev/null +++ b/elixir/test/symphony_elixir/token_usage_ledger_test.exs @@ -0,0 +1,246 @@ +defmodule SymphonyElixir.TokenUsageLedgerTest do + use ExUnit.Case + + import ExUnit.CaptureLog + + alias SymphonyElixir.TokenUsageLedger + + setup do + previous_ledger_file = Application.get_env(:symphony_elixir, :token_usage_ledger_file) + previous_log_file = Application.get_env(:symphony_elixir, :log_file) + + test_root = + Path.join( + System.tmp_dir!(), + "symphony-token-usage-ledger-#{System.unique_integer([:positive])}" + ) + + File.mkdir_p!(test_root) + ledger_file = Path.join(test_root, "token_usage.jsonl") + + on_exit(fn -> + restore_app_env(:token_usage_ledger_file, previous_ledger_file) + restore_app_env(:log_file, previous_log_file) + File.rm_rf(test_root) + end) + + {:ok, ledger_file: ledger_file, test_root: test_root} + end + + test "default ledger path is next to the configured log file", %{test_root: test_root} do + Application.delete_env(:symphony_elixir, :token_usage_ledger_file) + Application.put_env(:symphony_elixir, :log_file, Path.join(test_root, "log/symphony.log")) + + assert TokenUsageLedger.file_path() == Path.join(test_root, "log/token_usage.jsonl") + + Application.put_env(:symphony_elixir, :token_usage_ledger_file, Path.join(test_root, "custom.jsonl")) + assert TokenUsageLedger.file_path() == Path.join(test_root, "custom.jsonl") + end + + test "appends valid JSONL records", %{ledger_file: ledger_file} do + assert :ok = + TokenUsageLedger.append_observation( + %{ + observed_at: ~U[2026-04-20 10:00:00Z], + final: false, + issue_id: "issue-1", + issue_identifier: "MT-1", + session_id: "thread-1-turn-1", + worker_host: "worker-a", + workspace_path: "/tmp/MT-1", + turn_count: 1, + input_tokens: 10, + output_tokens: 5, + total_tokens: 15, + source_event: :notification + }, + file: ledger_file + ) + + assert [ + %{ + "schema_version" => 1, + "observed_at" => "2026-04-20T10:00:00Z", + "final" => false, + "issue_id" => "issue-1", + "issue_identifier" => "MT-1", + "session_id" => "thread-1-turn-1", + "worker_host" => "worker-a", + "workspace_path" => "/tmp/MT-1", + "turn_count" => 1, + "input_tokens" => 10, + "output_tokens" => 5, + "total_tokens" => 15, + "source_event" => "notification" + } + ] = + ledger_file + |> File.read!() + |> String.split("\n", trim: true) + |> Enum.map(&Jason.decode!/1) + end + + test "summarizes max token totals per issue and session", %{ledger_file: ledger_file} do + append!(ledger_file, "MT-1", "session-a", 100, 20, 120) + append!(ledger_file, "MT-1", "session-a", 90, 10, 100) + append!(ledger_file, "MT-1", "session-b", 30, 5, 35) + append!(ledger_file, "MT-2", "session-c", 7, 3, 10) + + assert TokenUsageLedger.summary(file: ledger_file) == %{ + input_tokens: 137, + output_tokens: 28, + total_tokens: 165, + issue_count: 2, + session_count: 3 + } + + assert TokenUsageLedger.issue_summary("MT-1", file: ledger_file) == %{ + issue_id: "issue-MT-1", + issue_identifier: "MT-1", + worker_host: nil, + workspace_path: "/tmp/MT-1", + input_tokens: 130, + output_tokens: 25, + total_tokens: 155, + session_count: 2 + } + end + + test "ignores malformed and unreadable ledger records", %{ledger_file: ledger_file} do + File.write!( + ledger_file, + [ + "not json", + "[]", + Jason.encode!(%{"schema_version" => 2, "issue_identifier" => "MT-0"}), + Jason.encode!(%{"schema_version" => 1, "issue_identifier" => "MT-0"}), + "" + ] + |> Enum.join("\n") + ) + + append!(ledger_file, "MT-3", "session-d", 1, 2, 3) + + assert TokenUsageLedger.summary(file: ledger_file) == %{ + input_tokens: 1, + output_tokens: 2, + total_tokens: 3, + issue_count: 1, + session_count: 1 + } + + missing_file = Path.join(Path.dirname(ledger_file), "missing.jsonl") + assert TokenUsageLedger.read_records(file: missing_file) == [] + assert TokenUsageLedger.issue_summary("MT-MISSING", file: ledger_file) == nil + end + + test "normalizes invalid token field values", %{ledger_file: ledger_file} do + File.write!( + ledger_file, + Jason.encode!(%{ + "schema_version" => 1, + "issue_identifier" => "MT-0", + "session_id" => "session-zero", + "final" => "not-a-bool", + "input_tokens" => "bad", + "output_tokens" => -2, + "total_tokens" => 0 + }) + ) + + assert [ + %{ + final: false, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0 + } + ] = TokenUsageLedger.read_records(file: ledger_file) + end + + test "reads configured ledger path and normalizes string token values", %{ledger_file: ledger_file} do + Application.put_env(:symphony_elixir, :token_usage_ledger_file, ledger_file) + + File.write!( + ledger_file, + Jason.encode!(%{ + "schema_version" => 1, + "issue_identifier" => "MT-STRING", + "session_id" => "session-string", + "input_tokens" => "4", + "output_tokens" => "1", + "total_tokens" => "5", + "source_event" => [] + }) + ) + + assert [ + %{ + issue_identifier: "MT-STRING", + session_id: "session-string", + input_tokens: 4, + output_tokens: 1, + total_tokens: 5, + source_event: nil + } + ] = TokenUsageLedger.read_records() + end + + test "write failures are logged but do not crash", %{test_root: test_root} do + log = + capture_log(fn -> + assert :ok = + TokenUsageLedger.append_observation( + %{ + issue_identifier: "MT-ERR", + session_id: "session-error", + input_tokens: 1, + output_tokens: 0, + total_tokens: 1 + }, + file: test_root + ) + end) + + assert log =~ "Failed to append token usage ledger record" + + log = + capture_log(fn -> + assert :ok = + TokenUsageLedger.append_observation( + %{ + issue_id: 123, + issue_identifier: "MT-ERR", + session_id: "session-error", + source_event: 456, + input_tokens: 1, + output_tokens: 0, + total_tokens: 1 + }, + file: nil + ) + end) + + assert log =~ "Failed to append token usage ledger record" + end + + defp append!(ledger_file, issue_identifier, session_id, input_tokens, output_tokens, total_tokens) do + TokenUsageLedger.append_observation( + %{ + observed_at: "2026-04-20T10:00:00Z", + final: false, + issue_id: "issue-#{issue_identifier}", + issue_identifier: issue_identifier, + session_id: session_id, + workspace_path: "/tmp/#{issue_identifier}", + input_tokens: input_tokens, + output_tokens: output_tokens, + total_tokens: total_tokens + }, + file: ledger_file + ) + end + + defp restore_app_env(key, nil), do: Application.delete_env(:symphony_elixir, key) + defp restore_app_env(key, value), do: Application.put_env(:symphony_elixir, key, value) +end