diff --git a/SPEC.md b/SPEC.md index 0c5d5308d0..8389e30c9d 100644 --- a/SPEC.md +++ b/SPEC.md @@ -1309,6 +1309,14 @@ SHOULD return: - `output_tokens` - `total_tokens` - `seconds_running` (aggregate runtime seconds as of snapshot time, including active sessions) +- `lifetime_totals` (optional durable totals across restarts) + - `input_tokens` + - `output_tokens` + - `total_tokens` + - `runtime_seconds` + - `sessions` + - `runs` +- `recent_sessions` (optional durable list of recently completed Codex sessions) - `rate_limits` (latest coding-agent rate limit payload, if available) RECOMMENDED snapshot error modes: @@ -1339,6 +1347,9 @@ Token accounting rules: - Do not treat generic `usage` maps as cumulative totals unless the event type defines them that way. - Accumulate aggregate totals in orchestrator state. +- Implementations MAY persist run and session statistics to embedded local storage for durable + observability across process restarts. +- Durable stores SHOULD tolerate missing/deleted databases by recreating empty history. Runtime accounting: @@ -1452,6 +1463,29 @@ Minimum endpoints: "total_tokens": 7400, "seconds_running": 1834.2 }, + "lifetime_totals": { + "input_tokens": 12000, + "output_tokens": 4200, + "total_tokens": 16200, + "runtime_seconds": 4810, + "sessions": 8, + "runs": 3 + }, + "recent_sessions": [ + { + "issue_id": "abc123", + "identifier": "MT-649", + "started_at": "2026-02-24T20:10:12Z", + "completed_at": "2026-02-24T20:14:59Z", + "turns": 2, + "input_tokens": 1200, + "output_tokens": 800, + "total_tokens": 2000, + "runtime_seconds": 287, + "final_state": "Done", + "model": "gpt-5.5" + } + ], "rate_limits": null } ``` diff --git a/elixir/README.md b/elixir/README.md index 42d24a6911..1855efd21a 100644 --- a/elixir/README.md +++ b/elixir/README.md @@ -115,6 +115,11 @@ Optional flags: - `--logs-root` tells Symphony to write logs under a different directory (default: `./log`) - `--port` also starts the Phoenix observability service (default: disabled) +Symphony writes durable run and Codex session statistics to +`~/.local/share/symphony/stats.db`. The SQLite database stores per-run token/runtime aggregates and +the last completed Codex sessions. Deleting the database resets the dashboard history without +affecting orchestration. + The `WORKFLOW.md` file uses YAML front matter for configuration, plus a Markdown body used as the Codex session prompt. @@ -221,6 +226,13 @@ The observability UI now runs on a minimal Phoenix stack: - Bandit as the HTTP server - Phoenix dependency static assets for the LiveView client bootstrap +The dashboard and `/api/v1/state` include durable statistics from `stats.db`: + +- `lifetime_totals` reports input/output/total tokens, runtime seconds, sessions, and run count + across Symphony restarts. +- `recent_sessions` lists the last 20 completed Codex sessions with issue, token, runtime, turn, + final state, and model fields. + ## Project Layout - `lib/`: application code and Mix tasks diff --git a/elixir/lib/symphony_elixir/orchestrator.ex b/elixir/lib/symphony_elixir/orchestrator.ex index 19806ebae2..db58432c16 100644 --- a/elixir/lib/symphony_elixir/orchestrator.ex +++ b/elixir/lib/symphony_elixir/orchestrator.ex @@ -7,7 +7,7 @@ defmodule SymphonyElixir.Orchestrator do require Logger import Bitwise, only: [<<<: 2] - alias SymphonyElixir.{AgentRunner, Config, StatusDashboard, Tracker, Workspace} + alias SymphonyElixir.{AgentRunner, Config, Stats, StatusDashboard, Tracker, Workspace} alias SymphonyElixir.GitHub.Adapter, as: GitHubAdapter alias SymphonyElixir.Linear.Issue @@ -34,13 +34,17 @@ defmodule SymphonyElixir.Orchestrator do :poll_check_in_progress, :tick_timer_ref, :tick_token, + :stats_run_id, + :stats_started_at, running: %{}, completed: MapSet.new(), claimed: MapSet.new(), blocked: %{}, retry_attempts: %{}, codex_totals: nil, - codex_rate_limits: nil + codex_rate_limits: nil, + peak_concurrent_agents: 0, + sessions_launched: 0 ] end @@ -52,11 +56,15 @@ defmodule SymphonyElixir.Orchestrator do @impl true def init(_opts) do + Process.flag(:trap_exit, true) now_ms = System.monotonic_time(:millisecond) + now = DateTime.utc_now() config = Config.settings!() case validate_tracker_startup(config.tracker) do :ok -> + stats_run_id = Stats.start_run(%{started_at: now}) + state = %State{ poll_interval_ms: config.polling.interval_ms, max_concurrent_agents: config.agent.max_concurrent_agents, @@ -64,6 +72,8 @@ defmodule SymphonyElixir.Orchestrator do poll_check_in_progress: false, tick_timer_ref: nil, tick_token: nil, + stats_run_id: stats_run_id, + stats_started_at: now, codex_totals: @empty_codex_totals, codex_rate_limits: nil } @@ -101,6 +111,8 @@ defmodule SymphonyElixir.Orchestrator do def handle_info({:tick, _tick_token}, state), do: {:noreply, state} + def handle_info({:EXIT, _pid, reason}, state), do: {:stop, reason, state} + def handle_info(:tick, state) do state = refresh_runtime_config(state) @@ -181,6 +193,7 @@ defmodule SymphonyElixir.Orchestrator do state |> apply_codex_token_delta(token_delta) |> apply_codex_rate_limits(update) + |> persist_stats_run() notify_dashboard() {:noreply, %{state | running: Map.put(running, issue_id, updated_running_entry)}} @@ -253,6 +266,21 @@ defmodule SymphonyElixir.Orchestrator do }) end + @impl true + def terminate(reason, %State{} = state) do + now = DateTime.utc_now() + + state = + Enum.reduce(state.running, state, fn {_issue_id, running_entry}, state_acc -> + record_session_completion_totals(state_acc, running_entry, now, "stopped") + end) + + finish_stats_run(state, now, reason) + :ok + end + + def terminate(_reason, _state), do: :ok + defp maybe_dispatch(%State{} = state) do state = state @@ -979,16 +1007,21 @@ defmodule SymphonyElixir.Orchestrator do codex_last_reported_total_tokens: 0, turn_count: 0, retry_attempt: normalize_retry_attempt(attempt), + model: Stats.model_from_command(Config.settings!().codex.command), started_at: DateTime.utc_now() }) - %{ + updated_state = %{ state | running: running, claimed: MapSet.put(state.claimed, issue.id), - retry_attempts: Map.delete(state.retry_attempts, issue.id) + retry_attempts: Map.delete(state.retry_attempts, issue.id), + peak_concurrent_agents: max(state.peak_concurrent_agents, map_size(running)), + sessions_launched: state.sessions_launched + 1 } + persist_stats_run(updated_state) + {:error, reason} -> Logger.error("Unable to spawn agent for #{issue_context(issue)}: #{inspect(reason)}") next_attempt = if is_integer(attempt), do: attempt + 1, else: nil @@ -1627,7 +1660,22 @@ defmodule SymphonyElixir.Orchestrator do end defp record_session_completion_totals(state, running_entry) when is_map(running_entry) do - runtime_seconds = running_seconds(running_entry.started_at, DateTime.utc_now()) + record_session_completion_totals(state, running_entry, DateTime.utc_now(), nil) + end + + defp record_session_completion_totals(state, _running_entry), do: state + + defp record_session_completion_totals(state, running_entry, completed_at, final_state_override) + when is_map(running_entry) do + runtime_seconds = running_seconds(running_entry.started_at, completed_at) + + record_stats_session( + state, + running_entry, + completed_at, + runtime_seconds, + final_state_override + ) codex_totals = apply_token_delta( @@ -1641,9 +1689,83 @@ defmodule SymphonyElixir.Orchestrator do ) %{state | codex_totals: codex_totals} + |> persist_stats_run() + end + + defp record_stats_session(state, running_entry, completed_at, runtime_seconds, final_state_override) do + if stats_session_recordable?(running_entry) do + Stats.record_session(%{ + run_id: state.stats_run_id, + issue_id: running_entry_issue_id(running_entry), + identifier: Map.get(running_entry, :identifier), + started_at: Map.get(running_entry, :started_at), + completed_at: completed_at, + turns: Map.get(running_entry, :turn_count, 0), + input_tokens: Map.get(running_entry, :codex_input_tokens, 0), + output_tokens: Map.get(running_entry, :codex_output_tokens, 0), + total_tokens: Map.get(running_entry, :codex_total_tokens, 0), + runtime_seconds: runtime_seconds, + final_state: final_state_override || running_entry_final_state(running_entry), + model: Map.get(running_entry, :model) + }) + end end - defp record_session_completion_totals(state, _running_entry), do: state + defp stats_session_recordable?(running_entry) do + match?(%DateTime{}, Map.get(running_entry, :started_at)) and + (is_binary(Map.get(running_entry, :session_id)) or Map.get(running_entry, :turn_count, 0) > 0 or + Enum.any?( + [ + Map.get(running_entry, :codex_input_tokens, 0), + Map.get(running_entry, :codex_output_tokens, 0), + Map.get(running_entry, :codex_total_tokens, 0) + ], + &(&1 > 0) + )) + end + + defp running_entry_issue_id(%{issue: %Issue{id: issue_id}}) when is_binary(issue_id), do: issue_id + defp running_entry_issue_id(%{issue_id: issue_id}) when is_binary(issue_id), do: issue_id + defp running_entry_issue_id(_running_entry), do: nil + + defp running_entry_final_state(%{issue: %Issue{state: state}}) when is_binary(state), do: state + defp running_entry_final_state(%{state: state}) when is_binary(state), do: state + defp running_entry_final_state(_running_entry), do: nil + + defp persist_stats_run(%State{} = state) do + Stats.update_run(state.stats_run_id, stats_run_attrs(state)) + state + end + + defp finish_stats_run(%State{} = state, %DateTime{} = stopped_at, reason) do + Stats.finish_run( + state.stats_run_id, + stats_run_attrs(state, %{ + stopped_at: stopped_at, + restart_reason: restart_reason(reason) + }) + ) + end + + defp stats_run_attrs(%State{} = state, extra \\ %{}) do + codex_totals = state.codex_totals || @empty_codex_totals + + %{ + started_at: state.stats_started_at, + peak_concurrent_agents: state.peak_concurrent_agents, + sessions_launched: state.sessions_launched, + input_tokens: Map.get(codex_totals, :input_tokens, 0), + output_tokens: Map.get(codex_totals, :output_tokens, 0), + total_tokens: Map.get(codex_totals, :total_tokens, 0), + runtime_seconds: Map.get(codex_totals, :seconds_running, 0) + } + |> Map.merge(extra) + end + + defp restart_reason(:normal), do: "normal" + defp restart_reason(:shutdown), do: "shutdown" + defp restart_reason({:shutdown, reason}), do: "shutdown: #{inspect(reason)}" + defp restart_reason(reason), do: inspect(reason) defp refresh_runtime_config(%State{} = state) do config = Config.settings!() diff --git a/elixir/lib/symphony_elixir/stats.ex b/elixir/lib/symphony_elixir/stats.ex new file mode 100644 index 0000000000..943f156710 --- /dev/null +++ b/elixir/lib/symphony_elixir/stats.ex @@ -0,0 +1,266 @@ +defmodule SymphonyElixir.Stats do + @moduledoc """ + Durable Symphony run and Codex session statistics. + """ + + require Logger + + alias SymphonyElixir.Stats.SQLite + + @default_limit 20 + @empty_lifetime_totals %{ + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + runtime_seconds: 0, + sessions: 0, + runs: 0 + } + + @type lifetime_totals :: %{ + input_tokens: non_neg_integer(), + output_tokens: non_neg_integer(), + total_tokens: non_neg_integer(), + runtime_seconds: non_neg_integer(), + sessions: non_neg_integer(), + runs: non_neg_integer() + } + + @type recent_session :: %{ + id: integer(), + run_id: integer() | nil, + issue_id: String.t() | nil, + identifier: String.t() | nil, + started_at: String.t() | nil, + completed_at: String.t() | nil, + turns: non_neg_integer(), + input_tokens: non_neg_integer(), + output_tokens: non_neg_integer(), + total_tokens: non_neg_integer(), + runtime_seconds: non_neg_integer(), + final_state: String.t() | nil, + model: String.t() | nil + } + + @spec db_path() :: Path.t() + def db_path do + case Application.get_env(:symphony_elixir, :stats_db_file) do + path when is_binary(path) and path != "" -> Path.expand(path) + _ -> default_db_path() + end + end + + @spec default_db_path() :: Path.t() + def default_db_path do + data_home = + case System.get_env("XDG_DATA_HOME") do + path when is_binary(path) and path != "" -> Path.expand(path) + _ -> Path.expand("~/.local/share") + end + + Path.join([data_home, "symphony", "stats.db"]) + end + + @spec start_run(map(), keyword()) :: integer() | nil + def start_run(attrs \\ %{}, opts \\ []) when is_map(attrs) do + case adapter(opts).start_run(path(opts), normalize_run_attrs(attrs)) do + {:ok, id} when is_integer(id) -> + id + + {:error, reason} -> + log_failure("start stats run", reason) + nil + end + end + + @spec update_run(integer() | nil, map()) :: :ok + def update_run(run_id, attrs), do: update_run(run_id, attrs, []) + + @spec update_run(integer() | nil, map(), keyword()) :: :ok + def update_run(nil, _attrs, _opts), do: :ok + + def update_run(run_id, attrs, opts) when is_integer(run_id) and is_map(attrs) do + case adapter(opts).update_run(path(opts), run_id, normalize_run_attrs(attrs)) do + :ok -> + :ok + + {:error, reason} -> + log_failure("update stats run", reason) + :ok + end + end + + @spec finish_run(integer() | nil, map(), keyword()) :: :ok + def finish_run(run_id, attrs, opts \\ []) do + update_run(run_id, attrs, opts) + end + + @spec record_session(map(), keyword()) :: :ok + def record_session(attrs, opts \\ []) when is_map(attrs) do + case adapter(opts).record_session(path(opts), normalize_session_attrs(attrs)) do + {:ok, _id} -> + :ok + + {:error, reason} -> + log_failure("record stats session", reason) + :ok + end + end + + @spec lifetime_totals(keyword()) :: lifetime_totals() + def lifetime_totals(opts \\ []) do + case adapter(opts).lifetime_totals(path(opts)) do + {:ok, totals} -> + normalize_lifetime_totals(totals) + + {:error, reason} -> + log_failure("read lifetime stats", reason) + @empty_lifetime_totals + end + end + + @spec recent_sessions(non_neg_integer(), keyword()) :: [recent_session()] + def recent_sessions(limit \\ @default_limit, opts \\ []) do + limit = normalize_limit(limit) + + case adapter(opts).recent_sessions(path(opts), limit) do + {:ok, sessions} -> + Enum.map(sessions, &normalize_recent_session/1) + + {:error, reason} -> + log_failure("read recent stats sessions", reason) + [] + end + end + + @spec model_from_command(String.t() | nil) :: String.t() | nil + def model_from_command(command) when is_binary(command) do + model_from_flag(command) || model_from_config(command) + end + + def model_from_command(_command), do: nil + + defp adapter(opts), do: Keyword.get(opts, :adapter, SQLite) + defp path(opts), do: Keyword.get(opts, :db_path, db_path()) + + defp normalize_run_attrs(attrs) do + %{ + started_at: iso8601(value(attrs, :started_at)), + stopped_at: iso8601(value(attrs, :stopped_at)), + restart_reason: string_value(value(attrs, :restart_reason)), + peak_concurrent_agents: integer_value(value(attrs, :peak_concurrent_agents), 0), + sessions_launched: integer_value(value(attrs, :sessions_launched), 0), + input_tokens: integer_value(value(attrs, :input_tokens), 0), + output_tokens: integer_value(value(attrs, :output_tokens), 0), + total_tokens: integer_value(value(attrs, :total_tokens), 0), + runtime_seconds: integer_value(value(attrs, :runtime_seconds), 0) + } + end + + defp normalize_session_attrs(attrs) do + %{ + run_id: nullable_integer_value(value(attrs, :run_id)), + issue_id: string_value(value(attrs, :issue_id)), + identifier: string_value(value(attrs, :identifier)), + started_at: iso8601(value(attrs, :started_at)), + completed_at: iso8601(value(attrs, :completed_at)), + turns: integer_value(value(attrs, :turns), 0), + input_tokens: integer_value(value(attrs, :input_tokens), 0), + output_tokens: integer_value(value(attrs, :output_tokens), 0), + total_tokens: integer_value(value(attrs, :total_tokens), 0), + runtime_seconds: integer_value(value(attrs, :runtime_seconds), 0), + final_state: string_value(value(attrs, :final_state)), + model: string_value(value(attrs, :model)) + } + end + + defp normalize_lifetime_totals(totals) when is_map(totals) do + %{ + input_tokens: integer_value(value(totals, :input_tokens), 0), + output_tokens: integer_value(value(totals, :output_tokens), 0), + total_tokens: integer_value(value(totals, :total_tokens), 0), + runtime_seconds: integer_value(value(totals, :runtime_seconds), 0), + sessions: integer_value(value(totals, :sessions), 0), + runs: integer_value(value(totals, :runs), 0) + } + end + + defp normalize_recent_session(session) when is_map(session) do + %{ + id: integer_value(value(session, :id), 0), + run_id: nullable_integer_value(value(session, :run_id)), + issue_id: string_value(value(session, :issue_id)), + identifier: string_value(value(session, :identifier)), + started_at: string_value(value(session, :started_at)), + completed_at: string_value(value(session, :completed_at)), + turns: integer_value(value(session, :turns), 0), + input_tokens: integer_value(value(session, :input_tokens), 0), + output_tokens: integer_value(value(session, :output_tokens), 0), + total_tokens: integer_value(value(session, :total_tokens), 0), + runtime_seconds: integer_value(value(session, :runtime_seconds), 0), + final_state: string_value(value(session, :final_state)), + model: string_value(value(session, :model)) + } + end + + defp normalize_limit(limit) when is_integer(limit) and limit > 0, do: limit + defp normalize_limit(_limit), do: @default_limit + + defp value(map, key) when is_map(map), do: Map.get(map, key) || Map.get(map, Atom.to_string(key)) + + defp iso8601(%DateTime{} = datetime) do + datetime + |> DateTime.truncate(:second) + |> DateTime.to_iso8601() + end + + defp iso8601(value) when is_binary(value), do: string_value(value) + defp iso8601(_value), do: nil + + defp nullable_integer_value(nil), do: nil + defp nullable_integer_value(value), do: integer_value(value, 0) + + defp integer_value(value, _default) when is_integer(value), do: max(value, 0) + + defp integer_value(value, default) when is_binary(value) do + case Integer.parse(String.trim(value)) do + {parsed, ""} -> max(parsed, 0) + _ -> default + end + end + + defp integer_value(_value, default), do: default + + defp string_value(nil), do: nil + + defp string_value(value) when is_binary(value) do + value + |> String.trim() + |> case do + "" -> nil + trimmed -> trimmed + end + end + + defp string_value(value) when is_atom(value), do: Atom.to_string(value) + defp string_value(value) when is_integer(value), do: Integer.to_string(value) + defp string_value(_value), do: nil + + defp model_from_flag(command) do + case Regex.run(~r/(?:^|\s)--model(?:=|\s+)(?:"([^"]+)"|'([^']+)'|([^\s'"]+))/, command) do + [_match | captures] -> Enum.find_value(captures, &string_value/1) + _ -> nil + end + end + + defp model_from_config(command) do + case Regex.run(~r/model\s*=\s*\\?["']?(?[A-Za-z0-9._:\/-]+)/, command, capture: :all_names) do + [model] -> string_value(model) + _ -> nil + end + end + + defp log_failure(action, reason) do + Logger.warning("Failed to #{action}: #{inspect(reason)}") + end +end diff --git a/elixir/lib/symphony_elixir/stats/sqlite.ex b/elixir/lib/symphony_elixir/stats/sqlite.ex new file mode 100644 index 0000000000..7ee8ee2784 --- /dev/null +++ b/elixir/lib/symphony_elixir/stats/sqlite.ex @@ -0,0 +1,372 @@ +defmodule SymphonyElixir.Stats.SQLite do + @moduledoc false + + alias Exqlite.Sqlite3 + + @schema_version 1 + + @type query_result :: :ok | {:error, term()} + + @spec start_run(Path.t(), map()) :: {:ok, integer()} | {:error, term()} + def start_run(path, attrs) when is_binary(path) and is_map(attrs) do + with_connection(path, fn conn -> + case execute_statement( + conn, + """ + INSERT INTO symphony_runs ( + started_at, + stopped_at, + restart_reason, + peak_concurrent_agents, + sessions_launched, + input_tokens, + output_tokens, + total_tokens, + runtime_seconds + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + attrs.started_at, + attrs.stopped_at, + attrs.restart_reason, + attrs.peak_concurrent_agents, + attrs.sessions_launched, + attrs.input_tokens, + attrs.output_tokens, + attrs.total_tokens, + attrs.runtime_seconds + ] + ) do + :ok -> Sqlite3.last_insert_rowid(conn) + {:error, reason} -> {:error, reason} + end + end) + end + + @spec update_run(Path.t(), integer(), map()) :: query_result() + def update_run(path, run_id, attrs) when is_binary(path) and is_integer(run_id) and is_map(attrs) do + with_connection(path, fn conn -> + case execute_statement_changes( + conn, + """ + UPDATE symphony_runs + SET stopped_at = COALESCE(?, stopped_at), + restart_reason = COALESCE(?, restart_reason), + peak_concurrent_agents = ?, + sessions_launched = ?, + input_tokens = ?, + output_tokens = ?, + total_tokens = ?, + runtime_seconds = ? + WHERE id = ? + """, + [ + attrs.stopped_at, + attrs.restart_reason, + attrs.peak_concurrent_agents, + attrs.sessions_launched, + attrs.input_tokens, + attrs.output_tokens, + attrs.total_tokens, + attrs.runtime_seconds, + run_id + ] + ) do + {:ok, changes} when changes > 0 -> :ok + {:ok, 0} -> recover_missing_run(conn, run_id, attrs) + {:error, reason} -> {:error, reason} + end + end) + end + + @spec record_session(Path.t(), map()) :: {:ok, integer()} | {:error, term()} + def record_session(path, attrs) when is_binary(path) and is_map(attrs) do + with_connection(path, fn conn -> + case execute_statement( + conn, + """ + INSERT INTO codex_sessions ( + run_id, + issue_id, + identifier, + started_at, + completed_at, + turns, + input_tokens, + output_tokens, + total_tokens, + runtime_seconds, + final_state, + model + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + attrs.run_id, + attrs.issue_id, + attrs.identifier, + attrs.started_at, + attrs.completed_at, + attrs.turns, + attrs.input_tokens, + attrs.output_tokens, + attrs.total_tokens, + attrs.runtime_seconds, + attrs.final_state, + attrs.model + ] + ) do + :ok -> Sqlite3.last_insert_rowid(conn) + {:error, reason} -> {:error, reason} + end + end) + end + + @spec lifetime_totals(Path.t()) :: {:ok, map()} | {:error, term()} + def lifetime_totals(path) when is_binary(path) do + with_connection(path, fn conn -> + case fetch_rows(conn, """ + SELECT + COALESCE(SUM(input_tokens), 0), + COALESCE(SUM(output_tokens), 0), + COALESCE(SUM(total_tokens), 0), + COALESCE(SUM(runtime_seconds), 0), + COALESCE(SUM(sessions_launched), 0), + COUNT(*) + FROM symphony_runs + """) do + {:ok, [[input_tokens, output_tokens, total_tokens, runtime_seconds, sessions, runs]]} -> + {:ok, + %{ + input_tokens: input_tokens, + output_tokens: output_tokens, + total_tokens: total_tokens, + runtime_seconds: runtime_seconds, + sessions: sessions, + runs: runs + }} + + {:ok, other} -> + {:error, {:unexpected_lifetime_totals_rows, other}} + + {:error, reason} -> + {:error, reason} + end + end) + end + + @spec recent_sessions(Path.t(), pos_integer()) :: {:ok, [map()]} | {:error, term()} + def recent_sessions(path, limit) when is_binary(path) and is_integer(limit) and limit > 0 do + with_connection(path, fn conn -> + case fetch_rows( + conn, + """ + SELECT + id, + run_id, + issue_id, + identifier, + started_at, + completed_at, + turns, + input_tokens, + output_tokens, + total_tokens, + runtime_seconds, + final_state, + model + FROM codex_sessions + ORDER BY completed_at DESC, id DESC + LIMIT ? + """, + [limit] + ) do + {:ok, rows} -> {:ok, Enum.map(rows, &recent_session_from_row/1)} + {:error, reason} -> {:error, reason} + end + end) + end + + defp with_connection(path, fun) do + expanded_path = Path.expand(path) + + with :ok <- File.mkdir_p(Path.dirname(expanded_path)), + {:ok, conn} <- Sqlite3.open(expanded_path), + :ok <- ensure_schema(conn) do + try do + fun.(conn) + after + Sqlite3.close(conn) + end + end + end + + defp ensure_schema(conn) do + with :ok <- Sqlite3.execute(conn, "PRAGMA busy_timeout = 5000;"), + {:ok, version} <- user_version(conn), + true <- version in [0, @schema_version], + :ok <- Sqlite3.execute(conn, schema_sql()) do + :ok + else + false -> {:error, :unsupported_stats_schema_version} + {:error, reason} -> {:error, reason} + end + end + + defp user_version(conn) do + case fetch_rows(conn, "PRAGMA user_version") do + {:ok, [[version]]} when is_integer(version) -> {:ok, version} + {:ok, other} -> {:error, {:unexpected_user_version_rows, other}} + {:error, reason} -> {:error, reason} + end + end + + defp schema_sql do + """ + CREATE TABLE IF NOT EXISTS symphony_runs( + id INTEGER PRIMARY KEY, + started_at TEXT NOT NULL, + stopped_at TEXT, + restart_reason TEXT, + peak_concurrent_agents INTEGER NOT NULL DEFAULT 0, + sessions_launched INTEGER NOT NULL DEFAULT 0, + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + total_tokens INTEGER NOT NULL DEFAULT 0, + runtime_seconds INTEGER NOT NULL DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS codex_sessions( + id INTEGER PRIMARY KEY, + run_id INTEGER, + issue_id TEXT, + identifier TEXT, + started_at TEXT, + completed_at TEXT, + turns INTEGER NOT NULL DEFAULT 0, + input_tokens INTEGER NOT NULL DEFAULT 0, + output_tokens INTEGER NOT NULL DEFAULT 0, + total_tokens INTEGER NOT NULL DEFAULT 0, + runtime_seconds INTEGER NOT NULL DEFAULT 0, + final_state TEXT, + model TEXT, + FOREIGN KEY(run_id) REFERENCES symphony_runs(id) + ); + + CREATE INDEX IF NOT EXISTS codex_sessions_completed_at_idx + ON codex_sessions(completed_at DESC, id DESC); + + CREATE INDEX IF NOT EXISTS codex_sessions_identifier_idx + ON codex_sessions(identifier); + + PRAGMA user_version = 1; + """ + end + + defp execute_statement(conn, sql, params) do + with {:ok, statement} <- Sqlite3.prepare(conn, sql) do + try do + with :ok <- Sqlite3.bind(statement, params) do + case Sqlite3.step(conn, statement) do + :done -> :ok + {:error, reason} -> {:error, reason} + other -> {:error, {:unexpected_sqlite_result, other}} + end + end + after + Sqlite3.release(conn, statement) + end + end + end + + defp execute_statement_changes(conn, sql, params) do + case execute_statement(conn, sql, params) do + :ok -> Sqlite3.changes(conn) + {:error, reason} -> {:error, reason} + end + end + + defp recover_missing_run(conn, run_id, attrs) do + case fetch_rows(conn, "SELECT 1 FROM symphony_runs WHERE id = ? LIMIT 1", [run_id]) do + {:ok, [[1]]} -> :ok + {:ok, []} -> insert_recovered_run(conn, run_id, attrs) + {:ok, other} -> {:error, {:unexpected_run_lookup_rows, other}} + {:error, reason} -> {:error, reason} + end + end + + defp insert_recovered_run(conn, run_id, attrs) do + execute_statement( + conn, + """ + INSERT INTO symphony_runs ( + id, + started_at, + stopped_at, + restart_reason, + peak_concurrent_agents, + sessions_launched, + input_tokens, + output_tokens, + total_tokens, + runtime_seconds + ) VALUES (?, COALESCE(?, strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + run_id, + attrs.started_at, + attrs.stopped_at, + attrs.restart_reason, + attrs.peak_concurrent_agents, + attrs.sessions_launched, + attrs.input_tokens, + attrs.output_tokens, + attrs.total_tokens, + attrs.runtime_seconds + ] + ) + end + + defp fetch_rows(conn, sql, params \\ []) do + with {:ok, statement} <- Sqlite3.prepare(conn, sql) do + try do + with :ok <- Sqlite3.bind(statement, params) do + Sqlite3.fetch_all(conn, statement) + end + after + Sqlite3.release(conn, statement) + end + end + end + + defp recent_session_from_row([ + id, + run_id, + issue_id, + identifier, + started_at, + completed_at, + turns, + input_tokens, + output_tokens, + total_tokens, + runtime_seconds, + final_state, + model + ]) do + %{ + id: id, + run_id: run_id, + issue_id: issue_id, + identifier: identifier, + started_at: started_at, + completed_at: completed_at, + turns: turns, + input_tokens: input_tokens, + output_tokens: output_tokens, + total_tokens: total_tokens, + runtime_seconds: runtime_seconds, + final_state: final_state, + model: model + } + end +end diff --git a/elixir/lib/symphony_elixir_web/live/dashboard_live.ex b/elixir/lib/symphony_elixir_web/live/dashboard_live.ex index 05757523cb..b97c095158 100644 --- a/elixir/lib/symphony_elixir_web/live/dashboard_live.ex +++ b/elixir/lib/symphony_elixir_web/live/dashboard_live.ex @@ -105,10 +105,20 @@ defmodule SymphonyElixirWeb.DashboardLive do

+
+

Lifetime totals

+

<%= format_int(@payload.lifetime_totals.total_tokens) %>

+

+ <%= format_runtime_seconds(lifetime_runtime_seconds(@payload, @now)) %> / <%= format_int(@payload.lifetime_totals.sessions) %> sessions +

+
+

Runtime

<%= format_runtime_seconds(total_runtime_seconds(@payload, @now)) %>

-

Total Codex runtime across completed and active sessions.

+

+ This run / all time: <%= format_runtime_seconds(total_runtime_seconds(@payload, @now)) %> / <%= format_runtime_seconds(lifetime_runtime_seconds(@payload, @now)) %> +

@@ -123,6 +133,63 @@ defmodule SymphonyElixirWeb.DashboardLive do
<%= pretty_value(@payload.rate_limits) %>
+
+
+
+

Recent sessions

+

Last completed Codex sessions across Symphony runs.

+
+
+ + <%= if @payload.recent_sessions == [] do %> +

No completed sessions recorded.

+ <% else %> +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
IssueCompletedRuntime / turnsTokensStateModel
+ <%= session.identifier || session.issue_id || "n/a" %> + <%= session.completed_at || "n/a" %><%= format_stored_runtime_and_turns(session.runtime_seconds, session.turns) %> +
+ Total: <%= format_int(session.total_tokens) %> + In <%= format_int(session.input_tokens) %> / Out <%= format_int(session.output_tokens) %> +
+
+ + <%= session.final_state || "completed" %> + + <%= session.model || "n/a" %>
+
+ <% end %> +
+
@@ -362,10 +429,18 @@ defmodule SymphonyElixirWeb.DashboardLive do end defp total_runtime_seconds(payload, now) do - completed_runtime_seconds(payload) + - Enum.reduce(payload.running, 0, fn entry, total -> - total + runtime_seconds_from_started_at(entry.started_at, now) - end) + completed_runtime_seconds(payload) + active_runtime_seconds(payload, now) + end + + defp lifetime_runtime_seconds(payload, now) do + lifetime_totals = Map.get(payload, :lifetime_totals, %{}) + Map.get(lifetime_totals, :runtime_seconds, 0) + active_runtime_seconds(payload, now) + end + + defp active_runtime_seconds(payload, now) do + Enum.reduce(payload.running, 0, fn entry, total -> + total + runtime_seconds_from_started_at(entry.started_at, now) + end) end defp format_runtime_and_turns(started_at, turn_count, now) when is_integer(turn_count) and turn_count > 0 do @@ -375,6 +450,12 @@ defmodule SymphonyElixirWeb.DashboardLive do defp format_runtime_and_turns(started_at, _turn_count, now), do: format_runtime_seconds(runtime_seconds_from_started_at(started_at, now)) + defp format_stored_runtime_and_turns(seconds, turns) when is_integer(turns) and turns > 0 do + "#{format_runtime_seconds(seconds)} / #{turns}" + end + + defp format_stored_runtime_and_turns(seconds, _turns), do: format_runtime_seconds(seconds) + defp format_runtime_seconds(seconds) when is_number(seconds) do whole_seconds = max(trunc(seconds), 0) mins = div(whole_seconds, 60) diff --git a/elixir/lib/symphony_elixir_web/presenter.ex b/elixir/lib/symphony_elixir_web/presenter.ex index 5d64c24c1f..3404acf314 100644 --- a/elixir/lib/symphony_elixir_web/presenter.ex +++ b/elixir/lib/symphony_elixir_web/presenter.ex @@ -3,7 +3,7 @@ defmodule SymphonyElixirWeb.Presenter do Shared projections for the observability API and dashboard. """ - alias SymphonyElixir.{Config, Orchestrator, StatusDashboard} + alias SymphonyElixir.{Config, Orchestrator, Stats, StatusDashboard} @spec state_payload(GenServer.name(), timeout()) :: map() def state_payload(orchestrator, snapshot_timeout_ms) do @@ -22,6 +22,8 @@ defmodule SymphonyElixirWeb.Presenter do retrying: Enum.map(snapshot.retrying, &retry_entry_payload/1), blocked: Enum.map(Map.get(snapshot, :blocked, []), &blocked_entry_payload/1), codex_totals: snapshot.codex_totals, + lifetime_totals: Stats.lifetime_totals(), + recent_sessions: Stats.recent_sessions(), rate_limits: snapshot.rate_limits } diff --git a/elixir/mix.exs b/elixir/mix.exs index bab854e0ff..8aaa7b417f 100644 --- a/elixir/mix.exs +++ b/elixir/mix.exs @@ -29,6 +29,7 @@ defmodule SymphonyElixir.MixProject do SymphonyElixir.Codex.DynamicTool, SymphonyElixir.HttpServer, SymphonyElixir.StatusDashboard, + SymphonyElixir.Stats.SQLite, SymphonyElixir.LogFile, SymphonyElixir.Workspace, Mix.Tasks.Symphony.Migrate.LinearToGithub, @@ -80,6 +81,7 @@ defmodule SymphonyElixir.MixProject do {:phoenix_live_view, "~> 1.1.0"}, {:req, "~> 0.5"}, {:jason, "~> 1.4"}, + {:exqlite, "~> 0.36.0"}, {:yaml_elixir, "~> 2.12"}, {:solid, "~> 1.2"}, {:ecto, "~> 3.13"}, diff --git a/elixir/mix.lock b/elixir/mix.lock index f2f7c58d1c..122a9da3de 100644 --- a/elixir/mix.lock +++ b/elixir/mix.lock @@ -4,11 +4,13 @@ "cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"}, "credo": {:hex, :credo, "1.7.16", "a9f1389d13d19c631cb123c77a813dbf16449a2aebf602f590defa08953309d4", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d0562af33756b21f248f066a9119e3890722031b6d199f22e3cf95550e4f1579"}, "date_time_parser": {:hex, :date_time_parser, "1.3.0", "6ba16850b5ab83dd126576451023ab65349e29af2336ca5084aa1e37025b476e", [:mix], [{:kday, "~> 1.0", [hex: :kday, repo: "hexpm", optional: false]}], "hexpm", "93c8203a8ddc66b1f1531fc0e046329bf0b250c75ffa09567ef03d2c09218e8c"}, + "db_connection": {:hex, :db_connection, "2.10.1", "d5465f6bcc125c1b8981c1dbf23c193ca16f446ec0b25832dc174f74f18be510", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "18ed94c6e627b4bf452dbd4df61b69a35a1e768525140bc1917b7a685026a6a3"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, "ecto": {:hex, :ecto, "3.13.5", "9d4a69700183f33bf97208294768e561f5c7f1ecf417e0fa1006e4a91713a834", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "df9efebf70cf94142739ba357499661ef5dbb559ef902b68ea1f3c1fabce36de"}, "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"}, "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, + "exqlite": {:hex, :exqlite, "0.36.0", "07b4f95d61cb82b8d52946d0639497fa7d32117e09b2c8d25e24a38723c295cb", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.8", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "cbeca3ce781f9ff07cfa9a87486f3ebd512a143ad6a14ed5c9fca21fe0bf3ae7"}, "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, "finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"}, "fine": {:hex, :fine, "0.1.4", "b19a89c1476c7c57afb5f9314aed5960b5bc95d5277de4cb5ee8e1d1616ce379", [:mix], [], "hexpm", "be3324cc454a42d80951cf6023b9954e9ff27c6daa255483b3e8d608670303f5"}, diff --git a/elixir/priv/static/dashboard.css b/elixir/priv/static/dashboard.css index 7a723ecaa5..62fcb23809 100644 --- a/elixir/priv/static/dashboard.css +++ b/elixir/priv/static/dashboard.css @@ -309,6 +309,11 @@ pre, min-width: 980px; } +.data-table-recent { + table-layout: fixed; + min-width: 900px; +} + .data-table th { padding: 0 0.5rem 0.75rem 0; text-align: left; diff --git a/elixir/test/support/test_support.exs b/elixir/test/support/test_support.exs index 50b39c374d..e5bad71448 100644 --- a/elixir/test/support/test_support.exs +++ b/elixir/test/support/test_support.exs @@ -33,8 +33,10 @@ defmodule SymphonyElixir.TestSupport do File.mkdir_p!(workflow_root) workflow_file = Path.join(workflow_root, "WORKFLOW.md") + stats_db_file = Path.join(workflow_root, "stats.db") write_workflow_file!(workflow_file) Workflow.set_workflow_file_path(workflow_file) + Application.put_env(:symphony_elixir, :stats_db_file, stats_db_file) if Process.whereis(SymphonyElixir.WorkflowStore), do: SymphonyElixir.WorkflowStore.force_reload() stop_default_http_server() @@ -43,6 +45,7 @@ defmodule SymphonyElixir.TestSupport do Application.delete_env(:symphony_elixir, :server_port_override) Application.delete_env(:symphony_elixir, :memory_tracker_issues) Application.delete_env(:symphony_elixir, :memory_tracker_recipient) + Application.delete_env(:symphony_elixir, :stats_db_file) File.rm_rf(workflow_root) end) diff --git a/elixir/test/symphony_elixir/extensions_test.exs b/elixir/test/symphony_elixir/extensions_test.exs index 7ca02f492e..37074bb272 100644 --- a/elixir/test/symphony_elixir/extensions_test.exs +++ b/elixir/test/symphony_elixir/extensions_test.exs @@ -394,6 +394,15 @@ defmodule SymphonyElixir.ExtensionsTest do "total_tokens" => 12, "seconds_running" => 42.5 }, + "lifetime_totals" => %{ + "input_tokens" => 0, + "output_tokens" => 0, + "total_tokens" => 0, + "runtime_seconds" => 0, + "sessions" => 0, + "runs" => 0 + }, + "recent_sessions" => [], "rate_limits" => %{"primary" => %{"remaining" => 11}} } diff --git a/elixir/test/symphony_elixir/orchestrator_status_test.exs b/elixir/test/symphony_elixir/orchestrator_status_test.exs index c673fd11e3..75ea464e9a 100644 --- a/elixir/test/symphony_elixir/orchestrator_status_test.exs +++ b/elixir/test/symphony_elixir/orchestrator_status_test.exs @@ -197,6 +197,96 @@ defmodule SymphonyElixir.OrchestratorStatusTest do assert completed_state.codex_totals.output_tokens == 4 assert completed_state.codex_totals.total_tokens == 16 assert is_integer(completed_state.codex_totals.seconds_running) + + assert [ + %{ + issue_id: ^issue_id, + identifier: "MT-201", + turns: 1, + input_tokens: 12, + output_tokens: 4, + total_tokens: 16, + final_state: "In Progress" + } + ] = SymphonyElixir.Stats.recent_sessions() + end + + test "orchestrator records active stats when shutting down" do + issue_id = "issue-shutdown-stats" + + issue = %Issue{ + id: issue_id, + identifier: "MT-SHUTDOWN", + title: "Shutdown stats", + description: "Persist active stats", + state: "In Progress", + url: "https://example.org/issues/MT-SHUTDOWN" + } + + orchestrator_name = Module.concat(__MODULE__, :ShutdownStatsOrchestrator) + {:ok, pid} = Orchestrator.start_link(name: orchestrator_name) + started_at = DateTime.add(DateTime.utc_now(), -90, :second) + + initial_state = :sys.get_state(pid) + + running_entry = %{ + pid: self(), + ref: make_ref(), + identifier: issue.identifier, + issue: issue, + session_id: "thread-shutdown-turn-shutdown", + turn_count: 1, + last_codex_message: nil, + last_codex_timestamp: nil, + last_codex_event: nil, + codex_input_tokens: 30, + codex_output_tokens: 6, + codex_total_tokens: 36, + codex_last_reported_input_tokens: 30, + codex_last_reported_output_tokens: 6, + codex_last_reported_total_tokens: 36, + started_at: started_at, + model: "gpt-5.5" + } + + :sys.replace_state(pid, fn _ -> + %{ + initial_state + | running: %{issue_id => running_entry}, + codex_totals: %{input_tokens: 30, output_tokens: 6, total_tokens: 36, seconds_running: 0}, + sessions_launched: 1, + peak_concurrent_agents: 1 + } + end) + + assert :ok = GenServer.stop(pid, :normal) + + assert [ + %{ + issue_id: ^issue_id, + identifier: "MT-SHUTDOWN", + turns: 1, + input_tokens: 30, + output_tokens: 6, + total_tokens: 36, + runtime_seconds: runtime_seconds, + final_state: "stopped", + model: "gpt-5.5" + } + ] = SymphonyElixir.Stats.recent_sessions() + + assert runtime_seconds >= 90 + + assert %{ + input_tokens: 30, + output_tokens: 6, + total_tokens: 36, + runtime_seconds: total_runtime_seconds, + sessions: 1, + runs: 1 + } = SymphonyElixir.Stats.lifetime_totals() + + assert total_runtime_seconds >= 90 end test "orchestrator snapshot tracks turn completed usage when present" do diff --git a/elixir/test/symphony_elixir/stats/sqlite_test.exs b/elixir/test/symphony_elixir/stats/sqlite_test.exs new file mode 100644 index 0000000000..dee5f7286d --- /dev/null +++ b/elixir/test/symphony_elixir/stats/sqlite_test.exs @@ -0,0 +1,166 @@ +defmodule SymphonyElixir.Stats.SQLiteTest do + use ExUnit.Case, async: true + + alias Exqlite.Sqlite3 + alias SymphonyElixir.Stats.SQLite + + @started_at "2026-05-21T10:00:00Z" + @completed_at "2026-05-21T10:05:00Z" + + setup do + test_root = + Path.join( + System.tmp_dir!(), + "symphony-sqlite-stats-test-#{System.unique_integer([:positive])}" + ) + + File.mkdir_p!(test_root) + + on_exit(fn -> + File.rm_rf(test_root) + end) + + {:ok, db_path: Path.join(test_root, "stats.db")} + end + + test "update_run after stats.db reset re-creates the run row", %{db_path: db_path} do + {:ok, run_id} = SQLite.start_run(db_path, run_attrs()) + File.rm!(db_path) + + assert :ok = + SQLite.update_run( + db_path, + run_id, + run_attrs( + peak_concurrent_agents: 3, + sessions_launched: 2, + input_tokens: 100, + output_tokens: 20, + total_tokens: 120, + runtime_seconds: 300 + ) + ) + + assert run_count(db_path, run_id) == 1 + + assert {:ok, + %{ + input_tokens: 100, + output_tokens: 20, + total_tokens: 120, + runtime_seconds: 300, + sessions: 2, + runs: 1 + }} = SQLite.lifetime_totals(db_path) + end + + test "record_session after stats.db reset attaches to a valid run", %{db_path: db_path} do + {:ok, run_id} = SQLite.start_run(db_path, run_attrs()) + File.rm!(db_path) + + assert :ok = + SQLite.update_run( + db_path, + run_id, + run_attrs( + sessions_launched: 1, + input_tokens: 12, + output_tokens: 4, + total_tokens: 16, + runtime_seconds: 90 + ) + ) + + assert {:ok, session_id} = + SQLite.record_session( + db_path, + session_attrs(run_id: run_id, input_tokens: 12, output_tokens: 4, total_tokens: 16) + ) + + assert is_integer(session_id) + assert joined_session_count(db_path) == 1 + + assert {:ok, + [ + %{ + run_id: ^run_id, + issue_id: "issue-22", + identifier: "MT-22", + input_tokens: 12, + output_tokens: 4, + total_tokens: 16 + } + ]} = SQLite.recent_sessions(db_path, 20) + end + + defp run_attrs(overrides \\ []) do + Map.merge( + %{ + started_at: @started_at, + stopped_at: nil, + restart_reason: nil, + peak_concurrent_agents: 0, + sessions_launched: 0, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + runtime_seconds: 0 + }, + Map.new(overrides) + ) + end + + defp session_attrs(overrides) do + Map.merge( + %{ + run_id: nil, + issue_id: "issue-22", + identifier: "MT-22", + started_at: @started_at, + completed_at: @completed_at, + turns: 2, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + runtime_seconds: 90, + final_state: "Done", + model: "gpt-5.5" + }, + Map.new(overrides) + ) + end + + defp run_count(db_path, run_id) do + scalar(db_path, "SELECT COUNT(*) FROM symphony_runs WHERE id = ?", [run_id]) + end + + defp joined_session_count(db_path) do + scalar( + db_path, + """ + SELECT COUNT(*) + FROM codex_sessions + INNER JOIN symphony_runs ON codex_sessions.run_id = symphony_runs.id + """, + [] + ) + end + + defp scalar(db_path, sql, params) do + {:ok, conn} = Sqlite3.open(db_path) + + try do + {:ok, statement} = Sqlite3.prepare(conn, sql) + + try do + :ok = Sqlite3.bind(statement, params) + {:ok, [[value]]} = Sqlite3.fetch_all(conn, statement) + value + after + Sqlite3.release(conn, statement) + end + after + Sqlite3.close(conn) + end + end +end diff --git a/elixir/test/symphony_elixir/stats_test.exs b/elixir/test/symphony_elixir/stats_test.exs new file mode 100644 index 0000000000..7ff073a7df --- /dev/null +++ b/elixir/test/symphony_elixir/stats_test.exs @@ -0,0 +1,357 @@ +defmodule SymphonyElixir.StatsTest do + use ExUnit.Case + + import ExUnit.CaptureLog + + alias SymphonyElixir.Stats + + defmodule FailingAdapter do + def start_run(_path, _attrs), do: {:error, :start_failed} + def update_run(_path, _run_id, _attrs), do: {:error, :update_failed} + def record_session(_path, _attrs), do: {:error, :session_failed} + def lifetime_totals(_path), do: {:error, :totals_failed} + def recent_sessions(_path, _limit), do: {:error, :recent_failed} + end + + defmodule NormalizingAdapter do + def start_run(_path, _attrs), do: {:ok, 123} + def update_run(_path, _run_id, _attrs), do: :ok + def record_session(_path, _attrs), do: {:ok, 456} + + def lifetime_totals(_path) do + {:ok, + %{ + "input_tokens" => "10", + "output_tokens" => -2, + "total_tokens" => 15, + "runtime_seconds" => "30", + "sessions" => "2", + "runs" => "1" + }} + end + + def recent_sessions(_path, 20) do + {:ok, + [ + %{ + "id" => "7", + "run_id" => "8", + "issue_id" => 9, + "identifier" => " MT-7 ", + "started_at" => "2026-05-20T10:00:00Z", + "completed_at" => "2026-05-20T10:02:00Z", + "turns" => "3", + "input_tokens" => "11", + "output_tokens" => "4", + "total_tokens" => "15", + "runtime_seconds" => "120", + "final_state" => :done, + "model" => 55 + } + ]} + end + end + + defmodule CaptureAdapter do + def start_run(_path, attrs) do + send(self(), {:start_run_attrs, attrs}) + {:ok, 1} + end + + def update_run(_path, _run_id, attrs) do + send(self(), {:update_run_attrs, attrs}) + :ok + end + + def record_session(_path, attrs) do + send(self(), {:record_session_attrs, attrs}) + {:ok, 2} + end + + def lifetime_totals(_path), do: {:ok, %{}} + def recent_sessions(_path, _limit), do: {:ok, []} + end + + setup do + previous_stats_db_file = Application.get_env(:symphony_elixir, :stats_db_file) + previous_xdg_data_home = System.get_env("XDG_DATA_HOME") + + test_root = + Path.join( + System.tmp_dir!(), + "symphony-stats-test-#{System.unique_integer([:positive])}" + ) + + File.mkdir_p!(test_root) + + on_exit(fn -> + restore_app_env(:stats_db_file, previous_stats_db_file) + restore_env("XDG_DATA_HOME", previous_xdg_data_home) + File.rm_rf(test_root) + end) + + {:ok, test_root: test_root} + end + + test "resolves configured and default database paths", %{test_root: test_root} do + configured_path = Path.join(test_root, "configured.db") + Application.put_env(:symphony_elixir, :stats_db_file, configured_path) + assert Stats.db_path() == configured_path + + Application.delete_env(:symphony_elixir, :stats_db_file) + System.put_env("XDG_DATA_HOME", test_root) + assert Stats.default_db_path() == Path.join([test_root, "symphony", "stats.db"]) + + System.delete_env("XDG_DATA_HOME") + assert Stats.default_db_path() == Path.expand("~/.local/share/symphony/stats.db") + end + + test "persists runs and completed sessions through sqlite", %{test_root: test_root} do + db_path = Path.join(test_root, "stats.db") + started_at = ~U[2026-05-20 10:00:00Z] + completed_at = ~U[2026-05-20 10:04:00Z] + + run_id = + Stats.start_run( + %{ + started_at: started_at, + peak_concurrent_agents: -1, + sessions_launched: "1" + }, + db_path: db_path + ) + + assert is_integer(run_id) + + assert :ok = + Stats.update_run( + run_id, + %{ + peak_concurrent_agents: 2, + sessions_launched: 1, + input_tokens: 100, + output_tokens: 20, + total_tokens: 120, + runtime_seconds: 240 + }, + db_path: db_path + ) + + assert :ok = + Stats.record_session( + %{ + run_id: run_id, + issue_id: "issue-1", + identifier: "MT-1", + started_at: started_at, + completed_at: completed_at, + turns: 2, + input_tokens: 100, + output_tokens: 20, + total_tokens: 120, + runtime_seconds: 240, + final_state: "Done", + model: "gpt-5.5" + }, + db_path: db_path + ) + + assert :ok = + Stats.finish_run( + run_id, + %{ + stopped_at: completed_at, + restart_reason: :normal, + peak_concurrent_agents: 2, + sessions_launched: 1, + input_tokens: 100, + output_tokens: 20, + total_tokens: 120, + runtime_seconds: 240 + }, + db_path: db_path + ) + + assert Stats.lifetime_totals(db_path: db_path) == %{ + input_tokens: 100, + output_tokens: 20, + total_tokens: 120, + runtime_seconds: 240, + sessions: 1, + runs: 1 + } + + assert [ + %{ + run_id: ^run_id, + issue_id: "issue-1", + identifier: "MT-1", + started_at: "2026-05-20T10:00:00Z", + completed_at: "2026-05-20T10:04:00Z", + turns: 2, + input_tokens: 100, + output_tokens: 20, + total_tokens: 120, + runtime_seconds: 240, + final_state: "Done", + model: "gpt-5.5" + } + ] = Stats.recent_sessions(1, db_path: db_path) + end + + test "missing sqlite database starts as empty history", %{test_root: test_root} do + db_path = Path.join(test_root, "missing.db") + + assert Stats.lifetime_totals(db_path: db_path) == %{ + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + runtime_seconds: 0, + sessions: 0, + runs: 0 + } + + assert Stats.recent_sessions(20, db_path: db_path) == [] + File.rm!(db_path) + + assert Stats.lifetime_totals(db_path: db_path) == %{ + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + runtime_seconds: 0, + sessions: 0, + runs: 0 + } + end + + test "normalizes adapter results and invalid recent-session limits" do + assert Stats.lifetime_totals(adapter: NormalizingAdapter, db_path: "unused.db") == %{ + input_tokens: 10, + output_tokens: 0, + total_tokens: 15, + runtime_seconds: 30, + sessions: 2, + runs: 1 + } + + assert [ + %{ + id: 7, + run_id: 8, + issue_id: "9", + identifier: "MT-7", + turns: 3, + input_tokens: 11, + output_tokens: 4, + total_tokens: 15, + runtime_seconds: 120, + final_state: "done", + model: "55" + } + ] = Stats.recent_sessions(0, adapter: NormalizingAdapter, db_path: "unused.db") + end + + test "normalizes write attributes before calling adapters" do + assert Stats.start_run( + %{ + "started_at" => "2026-05-20T10:00:00Z", + stopped_at: " ", + restart_reason: [], + peak_concurrent_agents: "bad" + }, + adapter: CaptureAdapter, + db_path: "unused.db" + ) == 1 + + assert_receive {:start_run_attrs, + %{ + started_at: "2026-05-20T10:00:00Z", + stopped_at: nil, + restart_reason: nil, + peak_concurrent_agents: 0 + }} + + assert Stats.update_run( + 1, + %{ + stopped_at: "2026-05-20T10:05:00Z", + restart_reason: " shutdown " + }, + adapter: CaptureAdapter, + db_path: "unused.db" + ) == :ok + + assert_receive {:update_run_attrs, + %{ + stopped_at: "2026-05-20T10:05:00Z", + restart_reason: "shutdown" + }} + + assert Stats.record_session( + %{ + run_id: nil, + started_at: "2026-05-20T10:01:00Z", + completed_at: "", + turns: "bad", + final_state: " ", + model: [] + }, + adapter: CaptureAdapter, + db_path: "unused.db" + ) == :ok + + assert_receive {:record_session_attrs, + %{ + run_id: nil, + started_at: "2026-05-20T10:01:00Z", + completed_at: nil, + turns: 0, + final_state: nil, + model: nil + }} + end + + test "logs adapter failures and keeps observability callers safe" do + log = + capture_log(fn -> + assert Stats.start_run(%{}, adapter: FailingAdapter, db_path: "unused.db") == nil + assert Stats.update_run(1, %{}, adapter: FailingAdapter, db_path: "unused.db") == :ok + assert Stats.update_run(nil, %{}, adapter: FailingAdapter, db_path: "unused.db") == :ok + assert Stats.finish_run(1, %{}, adapter: FailingAdapter, db_path: "unused.db") == :ok + assert Stats.record_session(%{}, adapter: FailingAdapter, db_path: "unused.db") == :ok + + assert Stats.lifetime_totals(adapter: FailingAdapter, db_path: "unused.db") == %{ + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + runtime_seconds: 0, + sessions: 0, + runs: 0 + } + + assert Stats.recent_sessions(20, adapter: FailingAdapter, db_path: "unused.db") == [] + end) + + assert log =~ "Failed to start stats run" + assert log =~ "Failed to update stats run" + assert log =~ "Failed to record stats session" + assert log =~ "Failed to read lifetime stats" + assert log =~ "Failed to read recent stats sessions" + end + + test "extracts model names from common codex command shapes" do + assert Stats.model_from_command(~s(codex --model gpt-5.5 app-server)) == "gpt-5.5" + assert Stats.model_from_command(~s(codex --model=gpt-5.4 app-server)) == "gpt-5.4" + assert Stats.model_from_command(~s(codex --model "gpt-5.5" app-server)) == "gpt-5.5" + assert Stats.model_from_command(~s(codex --model='gpt-5.4-mini' app-server)) == "gpt-5.4-mini" + assert Stats.model_from_command(~s(codex --config 'model="gpt-5.3-codex"' app-server)) == "gpt-5.3-codex" + assert Stats.model_from_command(nil) == nil + assert Stats.model_from_command("codex app-server") == nil + end + + defp restore_app_env(key, nil), do: Application.delete_env(:symphony_elixir, key) + defp restore_app_env(key, value), do: Application.put_env(:symphony_elixir, key, value) + + defp restore_env(key, nil), do: System.delete_env(key) + defp restore_env(key, value), do: System.put_env(key, value) +end