From dcf0071b0cddba5c3f77c5db7a0b9cf1ee0273ce Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Fri, 3 Apr 2026 11:02:21 -0500 Subject: [PATCH] feat: add stopOnRunFinished + improve activity tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New feature: spec.stopOnRunFinished — when true, the backend auto-stops the session on RUN_FINISHED event. Enables one-shot automation sessions that stop cleanly without inactivity timeout. Changes: - CRD: add stopOnRunFinished boolean to spec - Backend types: add to AgenticSessionSpec and CreateAgenticSessionRequest - Backend session handler: pass through to CR on create - AG-UI proxy: detect RUN_FINISHED + check spec → trigger stop with in-memory cache to avoid repeated k8s API calls - AG-UI proxy: all events now reset inactivity timer (was only 4 types) - AG-UI proxy: reduce activity debounce from 60s to 10s - Amber GHA: use stop-on-run-finished, fix/custom prompt split, shell-driven batch, session reuse, security fixes Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/amber-issue-handler.yml | 20 +++-- components/backend/handlers/sessions.go | 3 + components/backend/types/session.go | 2 + components/backend/websocket/agui_proxy.go | 76 +++++++++++++++---- .../base/crds/agenticsessions-crd.yaml | 3 + 5 files changed, 82 insertions(+), 22 deletions(-) diff --git a/.github/workflows/amber-issue-handler.yml b/.github/workflows/amber-issue-handler.yml index 0d263d80b..1c69ac370 100644 --- a/.github/workflows/amber-issue-handler.yml +++ b/.github/workflows/amber-issue-handler.yml @@ -95,7 +95,8 @@ jobs: [{"url": "https://github.com/${{ github.repository }}", "branch": "main"}] model: claude-opus-4-6 wait: 'true' - timeout: '60' + timeout: '0' + stop-on-run-finished: 'true' - name: Post-session labels and comment if: steps.existing.outputs.skip != 'true' @@ -248,7 +249,8 @@ jobs: [{"url": "https://github.com/${{ github.repository }}", "branch": "main"}] model: claude-opus-4-6 wait: 'true' - timeout: '60' + timeout: '0' + stop-on-run-finished: 'true' # Custom prompt: @amber — pass user's text - name: Run custom prompt @@ -271,14 +273,15 @@ jobs: [{"url": "https://github.com/${{ github.repository }}", "branch": "main"}] model: claude-opus-4-6 wait: 'true' - timeout: '60' + timeout: '0' + stop-on-run-finished: 'true' - name: Session summary if: always() && steps.context.outputs.is_fork != 'true' run: | - # Get session name from whichever step ran - SESSION_NAME="${{ steps.fix-session.outputs.session-name }}${{ steps.fix-issue-session.outputs.session-name }}${{ steps.custom-session.outputs.session-name }}" - SESSION_PHASE="${{ steps.fix-session.outputs.session-phase }}${{ steps.fix-issue-session.outputs.session-phase }}${{ steps.custom-session.outputs.session-phase }}" + # Get session name from whichever step ran (only one will have output) + SESSION_NAME="${{ steps.fix-session.outputs.session-name || steps.fix-issue-session.outputs.session-name || steps.custom-session.outputs.session-name }}" + SESSION_PHASE="${{ steps.fix-session.outputs.session-phase || steps.fix-issue-session.outputs.session-phase || steps.custom-session.outputs.session-phase }}" echo "### Amber — ${{ steps.context.outputs.type }} #${{ steps.context.outputs.number }}" >> $GITHUB_STEP_SUMMARY echo "" >> $GITHUB_STEP_SUMMARY @@ -313,6 +316,8 @@ jobs: import os import re import subprocess + import time + import uuid import requests from datetime import datetime, timezone @@ -368,7 +373,6 @@ jobs: def create_session_api(prompt, session_name="", model="claude-opus-4-6"): """Create a new session or send message to existing one.""" - import time, uuid if session_name: # Ensure session is running @@ -401,7 +405,7 @@ jobs: # Create new session url = f"{API_URL.rstrip('/')}/projects/{PROJECT}/agentic-sessions" - body = {"initialPrompt": prompt, "inactivityTimeout": 60, + body = {"initialPrompt": prompt, "stopOnRunFinished": True, "llmSettings": {"model": model}, "repos": [{"url": f"https://github.com/{REPO}", "branch": "main"}]} try: diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index e3bf2949b..a95b6c783 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -741,6 +741,9 @@ func CreateSession(c *gin.Context) { } spec["inactivityTimeout"] = *req.InactivityTimeout } + if req.StopOnRunFinished != nil && *req.StopOnRunFinished { + spec["stopOnRunFinished"] = true + } session := map[string]interface{}{ "apiVersion": "vteam.ambient-code/v1alpha1", diff --git a/components/backend/types/session.go b/components/backend/types/session.go index d0a79e255..a9d95135d 100644 --- a/components/backend/types/session.go +++ b/components/backend/types/session.go @@ -18,6 +18,7 @@ type AgenticSessionSpec struct { LLMSettings LLMSettings `json:"llmSettings"` Timeout int `json:"timeout"` InactivityTimeout *int `json:"inactivityTimeout,omitempty"` + StopOnRunFinished bool `json:"stopOnRunFinished,omitempty"` UserContext *UserContext `json:"userContext,omitempty"` BotAccount *BotAccountRef `json:"botAccount,omitempty"` ResourceOverrides *ResourceOverrides `json:"resourceOverrides,omitempty"` @@ -58,6 +59,7 @@ type CreateAgenticSessionRequest struct { LLMSettings *LLMSettings `json:"llmSettings,omitempty"` Timeout *int `json:"timeout,omitempty"` InactivityTimeout *int `json:"inactivityTimeout,omitempty"` + StopOnRunFinished *bool `json:"stopOnRunFinished,omitempty"` ParentSessionID string `json:"parent_session_id,omitempty"` Repos []SimpleRepo `json:"repos,omitempty"` ActiveWorkflow *WorkflowSelection `json:"activeWorkflow,omitempty"` diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index c9acfa318..2c4680741 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -36,8 +36,9 @@ import ( const ( // activityDebounceInterval is the minimum interval between CR status updates for lastActivityTime. - // Inactivity timeout is measured in hours, so minute-level granularity is sufficient. - activityDebounceInterval = 60 * time.Second + // Must be significantly shorter than the smallest inactivity timeout to prevent + // false inactivity detection while the session is actively processing. + activityDebounceInterval = 10 * time.Second // activityUpdateTimeout bounds how long a single activity status update can take. activityUpdateTimeout = 10 * time.Second @@ -56,6 +57,11 @@ var activityUpdateSem = make(chan struct{}, 50) // for each session to avoid excessive API calls. Key: "namespace/sessionName" var lastActivityUpdateTimes sync.Map +// stopOnRunFinishedCache tracks which sessions have stopOnRunFinished set. +// Populated lazily on first RUN_FINISHED event, avoids repeated k8s API calls. +// Key: sessionName, Value: bool +var stopOnRunFinishedCache sync.Map + // sessionProjectMap maps sessionName → projectName so that persistStreamedEvent // (which only receives sessionID) can look up the project for activity tracking. // Populated by HandleAGUIRunProxy on each run request. @@ -533,13 +539,20 @@ func persistStreamedEvent(sessionID, runID, threadID, jsonData string) { // sessionID-to-project mapping populated by HandleAGUIRunProxy. eventType, _ := event["type"].(string) - // Update lastActivityTime on CR for activity events (debounced). - if isActivityEvent(eventType) { + // Update lastActivityTime on CR for any event (debounced). + if eventType != "" { if projectName, ok := sessionProjectMap.Load(sessionID); ok { updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted) } } + // Stop session on RUN_FINISHED if stopOnRunFinished is set. + if eventType == types.EventTypeRunFinished { + if projectName, ok := sessionProjectMap.Load(sessionID); ok { + go checkAndStopOnRunFinished(projectName.(string), sessionID) + } + } + // agentStatus is derived at query time from the event log (DeriveAgentStatus). // No CR updates needed here — the persisted events ARE the source of truth. } @@ -984,17 +997,52 @@ func triggerDisplayNameGenerationIfNeeded(projectName, sessionName string, messa handlers.GenerateDisplayNameAsync(projectName, sessionName, userMessage, sessionCtx) } -// isActivityEvent returns true for AG-UI event types that indicate session activity. -func isActivityEvent(eventType string) bool { - switch eventType { - case types.EventTypeRunStarted, - types.EventTypeTextMessageStart, - types.EventTypeTextMessageContent, - types.EventTypeToolCallStart: - return true - default: - return false +// checkAndStopOnRunFinished checks if stopOnRunFinished is set for a session +// and triggers a stop. Uses an in-memory cache to avoid k8s API calls for +// sessions that don't have the flag set. +func checkAndStopOnRunFinished(projectName, sessionName string) { + if handlers.DynamicClient == nil { + return + } + + // Check cache first — skip k8s API call for sessions we've already checked + if cached, ok := stopOnRunFinishedCache.Load(sessionName); ok { + if !cached.(bool) { + return + } + } + + gvr := types.GetAgenticSessionResource() + ctx, cancel := context.WithTimeout(context.Background(), activityUpdateTimeout) + defer cancel() + + obj, err := handlers.DynamicClient.Resource(gvr).Namespace(projectName).Get(ctx, sessionName, metav1.GetOptions{}) + if err != nil { + log.Printf("stopOnRunFinished: failed to get session %s/%s: %v", projectName, sessionName, err) + return + } + + stopOnFinish, _, _ := unstructured.NestedBool(obj.Object, "spec", "stopOnRunFinished") + stopOnRunFinishedCache.Store(sessionName, stopOnFinish) + if !stopOnFinish { + return } + + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations["ambient-code.io/desired-phase"] = "Stopped" + annotations["ambient-code.io/stop-reason"] = "run-finished" + obj.SetAnnotations(annotations) + + _, err = handlers.DynamicClient.Resource(gvr).Namespace(projectName).Update(ctx, obj, metav1.UpdateOptions{}) + if err != nil { + log.Printf("stopOnRunFinished: failed to update session %s/%s: %v", projectName, sessionName, err) + return + } + + log.Printf("stopOnRunFinished: session %s/%s set to Stopped after RUN_FINISHED", projectName, sessionName) } // updateLastActivityTime updates the lastActivityTime field on the AgenticSession CR status. diff --git a/components/manifests/base/crds/agenticsessions-crd.yaml b/components/manifests/base/crds/agenticsessions-crd.yaml index baba13c21..7ffb6ca45 100644 --- a/components/manifests/base/crds/agenticsessions-crd.yaml +++ b/components/manifests/base/crds/agenticsessions-crd.yaml @@ -79,6 +79,9 @@ spec: type: integer minimum: 0 description: "Seconds of inactivity before auto-stopping a session. 0 disables auto-shutdown. If omitted, falls back to project-level inactivityTimeoutSeconds, then 24h default." + stopOnRunFinished: + type: boolean + description: "When true, automatically stop the session when the agent completes its run (RUN_FINISHED event). Useful for one-shot tasks triggered by automation." environmentVariables: type: object additionalProperties: