Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions elixir/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ The observability UI now runs on a minimal Phoenix stack:
- Bandit as the HTTP server
- Phoenix dependency static assets for the LiveView client bootstrap

## Supervisor Snapshot Reconciliation

Symphony treats Linear as the source of truth for issue workflow state. On every
poll, the orchestrator checks existing workspace-local supervisor snapshots at
`.artifacts/symphony/supervisor-status.json` for candidate issues returned by
Linear. If a snapshot still says `human-review` or `blocked` while the fresh
Linear issue is now in an active dispatch state such as `Rework`, Symphony
rewrites the snapshot to `idle`, records the current `linear_state`, clears the
old failure text, and adds `reconciliation_reason:
linear-state-newer-than-supervisor-snapshot`.

Operator check:

1. Read the current Linear state for the issue.
2. Read `<workspace>/<ISSUE>/.artifacts/symphony/supervisor-status.json`.
3. If Linear says `Human Review` and the snapshot says `human-review`, that is a
real pause.
4. If Linear says `Rework`, `In Progress`, or `Merging` and the snapshot still
says `human-review` or `blocked`, wait for one poll or call
`POST /api/v1/refresh`; the snapshot should change to `idle` with the
reconciliation reason above. `Todo` follows the normal dispatcher dependency
gate: non-terminal blockers remain a real hold, while unblocked `Todo`
snapshots reconcile the same way.

## Project Layout

- `lib/`: application code and Mix tasks
Expand Down
16 changes: 9 additions & 7 deletions elixir/lib/symphony_elixir/orchestrator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule SymphonyElixir.Orchestrator do
require Logger
import Bitwise, only: [<<<: 2]

alias SymphonyElixir.{AgentRunner, Config, StatusDashboard, Tracker, Workspace}
alias SymphonyElixir.{AgentRunner, Config, StatusDashboard, SupervisorSnapshot, Tracker, Workspace}
alias SymphonyElixir.Linear.Issue

@continuation_retry_delay_ms 1_000
Expand Down Expand Up @@ -225,9 +225,14 @@ defmodule SymphonyElixir.Orchestrator do
state = reconcile_running_issues(state)

with :ok <- Config.validate!(),
{:ok, issues} <- Tracker.fetch_candidate_issues(),
true <- available_slots(state) > 0 do
choose_issues(issues, state)
{:ok, issues} <- Tracker.fetch_candidate_issues() do
issues = SupervisorSnapshot.reconcile_candidate_snapshots(issues)

if available_slots(state) > 0 do
choose_issues(issues, state)
else
state
end
else
{:error, :missing_linear_api_token} ->
Logger.error("Linear API token missing in WORKFLOW.md")
Expand Down Expand Up @@ -266,9 +271,6 @@ defmodule SymphonyElixir.Orchestrator do
{:error, reason} ->
Logger.error("Failed to fetch from Linear: #{inspect(reason)}")
state

false ->
state
end
end

Expand Down
125 changes: 125 additions & 0 deletions elixir/lib/symphony_elixir/supervisor_snapshot.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
defmodule SymphonyElixir.SupervisorSnapshot do
@moduledoc """
Reconciles workspace-local supervisor status snapshots with fresh tracker state.
"""

require Logger

alias SymphonyElixir.{Config, Linear.Issue, Workspace}

@status_relative_path Path.join([".artifacts", "symphony", "supervisor-status.json"])
@stale_hold_states MapSet.new(["blocked", "human-review"])

@spec reconcile_candidate_snapshots([Issue.t()]) :: [Issue.t()]
def reconcile_candidate_snapshots(issues) when is_list(issues) do
Enum.each(issues, &reconcile_candidate_snapshot/1)
issues
end

@spec status_relative_path() :: Path.t()
def status_relative_path, do: @status_relative_path

defp reconcile_candidate_snapshot(%Issue{} = issue) do
with {:ok, workspace} <- Workspace.existing_issue_workspace(issue),
status_path <- Path.join(workspace, @status_relative_path),
{:ok, snapshot} <- read_snapshot(status_path),
true <- stale_hold_snapshot?(snapshot, issue) do
write_reconciled_snapshot(status_path, snapshot, issue)
else
_ -> :ok
end
end

defp reconcile_candidate_snapshot(_issue), do: :ok

defp read_snapshot(status_path) when is_binary(status_path) do
with {:ok, body} <- File.read(status_path),
{:ok, snapshot} when is_map(snapshot) <- Jason.decode(body) do
{:ok, snapshot}
else
{:error, :enoent} -> :missing
{:error, reason} -> {:error, reason}
_ -> {:error, :invalid_snapshot}
end
end

defp stale_hold_snapshot?(snapshot, %Issue{} = issue) when is_map(snapshot) do
hold_snapshot_state?(Map.get(snapshot, "state")) and issue_dispatchable?(issue)
end

defp hold_snapshot_state?(state) when is_binary(state) do
MapSet.member?(@stale_hold_states, normalize_state(state))
end

defp hold_snapshot_state?(_state), do: false

defp issue_dispatchable?(%Issue{state: state, blocked_by: blockers}) when is_binary(state) do
active_issue_state?(state) and !todo_blocked_by_non_terminal?(state, blockers)
end

defp issue_dispatchable?(_issue), do: false

defp active_issue_state?(state) when is_binary(state) do
Config.settings!().tracker.active_states
|> Enum.map(&normalize_state/1)
|> MapSet.new()
|> MapSet.member?(normalize_state(state))
end

defp todo_blocked_by_non_terminal?(state, blockers)
when is_binary(state) and is_list(blockers) do
normalize_state(state) == "todo" and blocked_by_non_terminal?(blockers)
end

defp todo_blocked_by_non_terminal?(_state, _blockers), do: false

defp blocked_by_non_terminal?(blockers) when is_list(blockers) do
terminal_states =
Config.settings!().tracker.terminal_states
|> Enum.map(&normalize_state/1)
|> MapSet.new()

Enum.any?(blockers, fn
%{state: blocker_state} when is_binary(blocker_state) ->
!MapSet.member?(terminal_states, normalize_state(blocker_state))

_ ->
true
end)
end

defp write_reconciled_snapshot(status_path, snapshot, %Issue{} = issue) do
now = DateTime.utc_now() |> DateTime.truncate(:second) |> DateTime.to_iso8601()

payload =
snapshot
|> Map.merge(%{
"state" => "idle",
"linear_state" => issue.state,
"active_worker" => nil,
"last_heartbeat" => now,
"recent_failure" => "none",
"status_artifact" => @status_relative_path,
"reconciliation_reason" => "linear-state-newer-than-supervisor-snapshot",
"reconciled_from" => %{
"state" => Map.get(snapshot, "state"),
"linear_state" => Map.get(snapshot, "linear_state"),
"recent_failure" => Map.get(snapshot, "recent_failure")
},
"reconciled_at" => now
})

File.mkdir_p!(Path.dirname(status_path))
File.write!(status_path, Jason.encode!(payload, pretty: true) <> "\n")

Logger.info("Reconciled stale supervisor snapshot issue_id=#{issue.id} issue_identifier=#{issue.identifier} linear_state=#{issue.state} status_path=#{status_path}")

:ok
end

defp normalize_state(state) when is_binary(state) do
state
|> String.trim()
|> String.downcase()
end
end
15 changes: 15 additions & 0 deletions elixir/lib/symphony_elixir/workspace.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ defmodule SymphonyElixir.Workspace do
end
end

@spec existing_issue_workspace(map() | String.t() | nil) ::
{:ok, Path.t()} | :missing | {:error, term()}
def existing_issue_workspace(issue_or_identifier) do
issue_context = issue_context(issue_or_identifier)
safe_id = safe_identifier(issue_context.issue_identifier)

with {:ok, workspace} <- workspace_path_for_issue(safe_id, nil),
:ok <- validate_workspace_path(workspace, nil) do
if File.dir?(workspace), do: {:ok, workspace}, else: :missing
end
rescue
error in [ArgumentError, ErlangError, File.Error] ->
{:error, error}
end

defp ensure_workspace(workspace, nil) do
cond do
File.dir?(workspace) ->
Expand Down
Loading
Loading