-
Notifications
You must be signed in to change notification settings - Fork 84
feat: add stopOnRunFinished + improve activity tracking #1191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,8 +36,9 @@ import ( | |
|
|
||
| const ( | ||
| // activityDebounceInterval is the minimum interval between CR status updates for lastActivityTime. | ||
| // Inactivity timeout is measured in hours, so minute-level granularity is sufficient. | ||
| activityDebounceInterval = 60 * time.Second | ||
| // Must be significantly shorter than the smallest inactivity timeout to prevent | ||
| // false inactivity detection while the session is actively processing. | ||
| activityDebounceInterval = 10 * time.Second | ||
|
|
||
| // activityUpdateTimeout bounds how long a single activity status update can take. | ||
| activityUpdateTimeout = 10 * time.Second | ||
|
|
@@ -56,6 +57,11 @@ var activityUpdateSem = make(chan struct{}, 50) | |
| // for each session to avoid excessive API calls. Key: "namespace/sessionName" | ||
| var lastActivityUpdateTimes sync.Map | ||
|
|
||
| // stopOnRunFinishedCache tracks which sessions have stopOnRunFinished set. | ||
| // Populated lazily on first RUN_FINISHED event, avoids repeated k8s API calls. | ||
| // Key: sessionName, Value: bool | ||
| var stopOnRunFinishedCache sync.Map | ||
|
Comment on lines
+60
to
+63
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cache this by AgenticSessions are namespaced. With the current keying, a cached Also applies to: 1008-1013, 1025-1026 🤖 Prompt for AI Agents |
||
|
|
||
| // sessionProjectMap maps sessionName → projectName so that persistStreamedEvent | ||
| // (which only receives sessionID) can look up the project for activity tracking. | ||
| // Populated by HandleAGUIRunProxy on each run request. | ||
|
|
@@ -533,13 +539,20 @@ func persistStreamedEvent(sessionID, runID, threadID, jsonData string) { | |
| // sessionID-to-project mapping populated by HandleAGUIRunProxy. | ||
| eventType, _ := event["type"].(string) | ||
|
|
||
| // Update lastActivityTime on CR for activity events (debounced). | ||
| if isActivityEvent(eventType) { | ||
| // Update lastActivityTime on CR for any event (debounced). | ||
| if eventType != "" { | ||
| if projectName, ok := sessionProjectMap.Load(sessionID); ok { | ||
| updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted) | ||
| } | ||
| } | ||
|
|
||
| // Stop session on RUN_FINISHED if stopOnRunFinished is set. | ||
| if eventType == types.EventTypeRunFinished { | ||
| if projectName, ok := sessionProjectMap.Load(sessionID); ok { | ||
| go checkAndStopOnRunFinished(projectName.(string), sessionID) | ||
| } | ||
|
Comment on lines
+542
to
+553
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
sed -n '538,556p' components/backend/websocket/agui_proxy.go
sed -n '1000,1105p' components/backend/websocket/agui_proxy.go
rg -n 'RetryOnConflict|Patch\(' components/backend/websocket/agui_proxy.go || trueRepository: ambient-code/platform Length of output: 4576
The stop-on-RUN_FINISHED path has two functional bugs:
Use 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| // agentStatus is derived at query time from the event log (DeriveAgentStatus). | ||
| // No CR updates needed here — the persisted events ARE the source of truth. | ||
| } | ||
|
|
@@ -984,17 +997,52 @@ func triggerDisplayNameGenerationIfNeeded(projectName, sessionName string, messa | |
| handlers.GenerateDisplayNameAsync(projectName, sessionName, userMessage, sessionCtx) | ||
| } | ||
|
|
||
| // isActivityEvent returns true for AG-UI event types that indicate session activity. | ||
| func isActivityEvent(eventType string) bool { | ||
| switch eventType { | ||
| case types.EventTypeRunStarted, | ||
| types.EventTypeTextMessageStart, | ||
| types.EventTypeTextMessageContent, | ||
| types.EventTypeToolCallStart: | ||
| return true | ||
| default: | ||
| return false | ||
| // checkAndStopOnRunFinished checks if stopOnRunFinished is set for a session | ||
| // and triggers a stop. Uses an in-memory cache to avoid k8s API calls for | ||
| // sessions that don't have the flag set. | ||
| func checkAndStopOnRunFinished(projectName, sessionName string) { | ||
| if handlers.DynamicClient == nil { | ||
| return | ||
| } | ||
|
|
||
| // Check cache first — skip k8s API call for sessions we've already checked | ||
| if cached, ok := stopOnRunFinishedCache.Load(sessionName); ok { | ||
| if !cached.(bool) { | ||
| return | ||
| } | ||
| } | ||
|
|
||
| gvr := types.GetAgenticSessionResource() | ||
| ctx, cancel := context.WithTimeout(context.Background(), activityUpdateTimeout) | ||
| defer cancel() | ||
|
|
||
| obj, err := handlers.DynamicClient.Resource(gvr).Namespace(projectName).Get(ctx, sessionName, metav1.GetOptions{}) | ||
| if err != nil { | ||
| log.Printf("stopOnRunFinished: failed to get session %s/%s: %v", projectName, sessionName, err) | ||
| return | ||
| } | ||
|
|
||
| stopOnFinish, _, _ := unstructured.NestedBool(obj.Object, "spec", "stopOnRunFinished") | ||
| stopOnRunFinishedCache.Store(sessionName, stopOnFinish) | ||
| if !stopOnFinish { | ||
| return | ||
| } | ||
|
|
||
| annotations := obj.GetAnnotations() | ||
| if annotations == nil { | ||
| annotations = make(map[string]string) | ||
| } | ||
| annotations["ambient-code.io/desired-phase"] = "Stopped" | ||
| annotations["ambient-code.io/stop-reason"] = "run-finished" | ||
| obj.SetAnnotations(annotations) | ||
|
|
||
| _, err = handlers.DynamicClient.Resource(gvr).Namespace(projectName).Update(ctx, obj, metav1.UpdateOptions{}) | ||
| if err != nil { | ||
| log.Printf("stopOnRunFinished: failed to update session %s/%s: %v", projectName, sessionName, err) | ||
| return | ||
| } | ||
|
|
||
| log.Printf("stopOnRunFinished: session %s/%s set to Stopped after RUN_FINISHED", projectName, sessionName) | ||
| } | ||
|
|
||
| // updateLastActivityTime updates the lastActivityTime field on the AgenticSession CR status. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopOnRunFinishedis write-only right nowLine 744 adds the create-time write, but
parseSpec()still never readsspec.stopOnRunFinished.GetSession,ListSessions, and the other handlers that returnparseSpec(spec)will still surface this field as false/omitted even when the CR storestrue, so the new API does not round-trip.🤖 Prompt for AI Agents