Skip to content

Commit 896aaa1

Browse files
Gkrumbach07Ambient Code Botclaude
authored
fix(runner,operator): improve session backup and resume reliability (#1057)
## Summary Three issues caused data loss and broken session resume when the runner container OOMed: - **Repo data lost on OOM**: `backup_git_repos()` only ran on SIGTERM. Runner OOM meant no SIGTERM → no repo backup. Now runs periodically every ~5 min. - **Session resume broken**: CLI session ID was only persisted after turn completion. OOM during first turn → no session ID in S3 → `--resume` never passed on restart → agent lost all context. Now persists immediately on CLI init. - **Operator didn't detect OOM**: Pod watch predicate only checked pod phase, which stays `Running` when one container dies (sidecar still alive). OOMKilled pods sat indefinitely. Now triggers on container termination too. ## Changes | File | Change | |------|--------| | `state-sync/sync.sh` | Periodic `backup_git_repos` every Nth sync cycle (configurable `REPO_BACKUP_INTERVAL`, default 5) | | `session.py` | `on_session_id` callback persists session ID to disk immediately on CLI init message | | `bridge.py` | Resume failure detection + removed redundant post-turn persist | | `agenticsession_controller.go` | Pod watch predicate triggers on container termination, not just pod phase | ## Root cause investigation Diagnosed from live OOMKilled pods in `bug-bash-mturley` namespace on UAT: - Confirmed `ambient-code-runner` container OOMKilled (exit 137, 8Gi limit) - State-sync kept running but never backed up repos (only on SIGTERM) - Session JSONL was valid (237 lines, all valid JSON) and `--resume` works fine on sessions with dangling tool calls (verified via SDK) - The actual cause: `claude_session_ids.json` never existed in S3 because the runner OOMed during its first turn, before `_persist_session_ids()` was called - Operator pod watch predicate filtered out the event because pod phase stayed `Running` ## Test plan - [x] Runner tests pass (488 passed, 11 skipped) - [x] Operator builds clean (`go vet`, `go build`) - [ ] Deploy to kind cluster and verify periodic repo backup runs - [ ] Simulate runner OOM and verify operator detects container termination - [ ] Verify session resume works after OOM recovery 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Ambient Code Bot <bot@ambient-code.local> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7158005 commit 896aaa1

File tree

4 files changed

+66
-8
lines changed

4 files changed

+66
-8
lines changed

components/operator/internal/controller/agenticsession_controller.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,26 @@ func (r *AgenticSessionReconciler) SetupWithManager(mgr ctrl.Manager) error {
261261
if !strings.HasSuffix(e.ObjectNew.Name, "-runner") {
262262
return false
263263
}
264-
// Trigger if phase changed
265-
return e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase
264+
// Trigger if pod phase changed
265+
if e.ObjectOld.Status.Phase != e.ObjectNew.Status.Phase {
266+
return true
267+
}
268+
// Trigger if a container newly terminated (e.g. runner OOM).
269+
// Pod phase stays Running when one container dies but
270+
// the sidecar is still alive, so we must also check
271+
// individual container statuses.
272+
oldTerminated := make(map[string]bool, len(e.ObjectOld.Status.ContainerStatuses))
273+
for _, cs := range e.ObjectOld.Status.ContainerStatuses {
274+
if cs.State.Terminated != nil {
275+
oldTerminated[cs.Name] = true
276+
}
277+
}
278+
for _, cs := range e.ObjectNew.Status.ContainerStatuses {
279+
if cs.State.Terminated != nil && !oldTerminated[cs.Name] {
280+
return true
281+
}
282+
}
283+
return false
266284
},
267285
DeleteFunc: func(e event.TypedDeleteEvent[*corev1.Pod]) bool {
268286
return strings.HasSuffix(e.Object.Name, "-runner")

components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,21 @@ async def run(
178178
async for event in wrapped_stream:
179179
yield event
180180

181-
# Persist session ID after turn completes (for --resume on pod restart)
182-
if worker.session_id:
183-
self._session_manager._session_ids[thread_id] = worker.session_id
184-
self._session_manager._persist_session_ids()
181+
# Detect resume failure (session ID already persisted
182+
# eagerly by the _on_session_id callback at init time).
183+
if (
184+
saved_session_id
185+
and worker.session_id
186+
and worker.session_id != saved_session_id
187+
):
188+
logger.warning(
189+
"Session resume failed: requested --resume %s "
190+
"but CLI created new session %s. "
191+
"Previous conversation history was lost "
192+
"(likely caused by ungraceful runner shutdown).",
193+
saved_session_id,
194+
worker.session_id,
195+
)
185196

186197
# Capture halt state for this thread to avoid race conditions
187198
# with concurrent runs modifying the shared adapter's halted flag

components/runners/ambient-runner/ambient_runner/bridges/claude/session.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import os
3232
from contextlib import suppress
3333
from pathlib import Path
34-
from typing import Any, AsyncIterator, Optional
34+
from typing import Any, AsyncIterator, Callable, Optional
3535

3636
logger = logging.getLogger(__name__)
3737

@@ -67,10 +67,12 @@ def __init__(
6767
thread_id: str,
6868
options: Any,
6969
api_key: str,
70+
on_session_id: Optional[Callable[[str, str], None]] = None,
7071
):
7172
self.thread_id = thread_id
7273
self._options = options
7374
self._api_key = api_key
75+
self._on_session_id = on_session_id
7476

7577
# Inbound: (prompt, session_id, output_queue) | _SHUTDOWN
7678
self._input_queue: asyncio.Queue = asyncio.Queue()
@@ -140,6 +142,13 @@ async def _run(self) -> None:
140142
sid = data.get("session_id")
141143
if sid:
142144
self.session_id = sid
145+
# Persist immediately so the session ID
146+
# survives even if this turn never completes
147+
# (e.g. runner OOM during tool execution).
148+
if self._on_session_id:
149+
self._on_session_id(
150+
self.thread_id, sid
151+
)
143152

144153
await output_queue.put(msg)
145154

@@ -289,7 +298,9 @@ async def get_or_create(
289298
)
290299
await self.destroy(thread_id)
291300

292-
worker = SessionWorker(thread_id, options, api_key)
301+
worker = SessionWorker(
302+
thread_id, options, api_key, on_session_id=self._on_session_id
303+
)
293304
await worker.start()
294305
self._workers[thread_id] = worker
295306
self._locks[thread_id] = asyncio.Lock()
@@ -345,6 +356,15 @@ async def shutdown(self) -> None:
345356

346357
# ── session ID persistence ──
347358

359+
def _on_session_id(self, thread_id: str, session_id: str) -> None:
360+
"""Called by workers as soon as the CLI returns a session ID.
361+
362+
Persists immediately so the mapping survives even if the current
363+
turn never completes (e.g. runner OOM during tool execution).
364+
"""
365+
self._session_ids[thread_id] = session_id
366+
self._persist_session_ids()
367+
348368
def _session_ids_path(self) -> Path | None:
349369
if not self._state_dir:
350370
return None

components/runners/state-sync/sync.sh

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ NAMESPACE="${NAMESPACE:-default}"
1010
SESSION_NAME="${SESSION_NAME:-unknown}"
1111
SYNC_INTERVAL="${SYNC_INTERVAL:-60}"
1212
MAX_SYNC_SIZE="${MAX_SYNC_SIZE:-1073741824}" # 1GB default
13+
REPO_BACKUP_INTERVAL="${REPO_BACKUP_INTERVAL:-5}" # Backup repos every Nth sync cycle
1314

1415
# Sanitize inputs to prevent path traversal
1516
NAMESPACE="${NAMESPACE//[^a-zA-Z0-9-]/}"
@@ -261,6 +262,7 @@ echo "Session: ${NAMESPACE}/${SESSION_NAME}"
261262
echo "S3 Endpoint: ${S3_ENDPOINT}"
262263
echo "S3 Bucket: ${S3_BUCKET}"
263264
echo "Sync interval: ${SYNC_INTERVAL}s"
265+
echo "Repo backup every: ${REPO_BACKUP_INTERVAL} sync cycles"
264266
echo "Max sync size: ${MAX_SYNC_SIZE} bytes"
265267
echo "========================================="
266268

@@ -283,8 +285,15 @@ echo "Waiting 30s for workspace to populate..."
283285
sleep 30
284286

285287
# Main sync loop
288+
sync_count=0
286289
while true; do
287290
check_size || echo "Size check warning (continuing anyway)"
291+
# Periodically backup git repos (every Nth cycle) so repo state
292+
# is preserved even if the runner container OOMs without SIGTERM
293+
sync_count=$((sync_count + 1))
294+
if [ $((sync_count % REPO_BACKUP_INTERVAL)) -eq 0 ]; then
295+
backup_git_repos || echo "Repo backup had errors (continuing)"
296+
fi
288297
sync_to_s3 || echo "Sync failed, will retry in ${SYNC_INTERVAL}s..."
289298
sleep ${SYNC_INTERVAL}
290299
done

0 commit comments

Comments
 (0)