From 5944d7f21911822f5ebfd9cf12db4e33fa5ee537 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Sat, 4 Apr 2026 13:03:04 -0400 Subject: [PATCH 1/5] docs(control-plane): spec CP token endpoint for runner gRPC auth renewal RHOAIENG-56711 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documents the synchronous token fetch endpoint on the CP as the replacement for async Secret-write renewal. Covers the problem (three-way async race against OIDC TTL), solution (GET /token with K8s SA TokenReview auth), bootstrap vs renewal split, new internal/tokenserver packages, runner reconnect changes, and AMBIENT_CP_TOKEN_URL env var injection. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- docs/internal/design/control-plane.spec.md | 96 +++++++++++++++++++++- 1 file changed, 95 insertions(+), 1 deletion(-) diff --git a/docs/internal/design/control-plane.spec.md b/docs/internal/design/control-plane.spec.md index c9ad76730..96ba6cd5c 100644 --- a/docs/internal/design/control-plane.spec.md +++ b/docs/internal/design/control-plane.spec.md @@ -109,9 +109,10 @@ Each section is joined with `\n\n`. Empty sections are omitted. If all four are | `WORKSPACE_PATH` | `/workspace` | Claude Code working directory | | `AGUI_PORT` | `8001` | Runner HTTP listener port | | `BACKEND_API_URL` | CP config | api-server base URL | -| `BOT_TOKEN` | from K8s secret | api-server bearer token | +| `BOT_TOKEN` | from K8s secret | api-server bearer token (bootstrap only — see Token Endpoint) | | `AMBIENT_GRPC_URL` | CP config | api-server gRPC address | | `AMBIENT_GRPC_USE_TLS` | CP config | TLS flag for gRPC | +| `AMBIENT_CP_TOKEN_URL` | CP config | CP token endpoint URL (e.g. `http://ambient-control-plane.{ns}.svc:8080/token`) | | `INITIAL_PROMPT` | assembled prompt | Auto-execute on startup | | `USE_VERTEX` / `ANTHROPIC_VERTEX_PROJECT_ID` / `CLOUD_ML_REGION` | CP config | Vertex AI config (when enabled) | | `GOOGLE_APPLICATION_CREDENTIALS` | `/app/vertex/ambient-code-key.json` | Vertex service account path | @@ -357,6 +358,96 @@ Status: 🔲 planned --- +## CP Token Endpoint + +### Problem + +Runner pods authenticate to the api-server gRPC interface using a `BOT_TOKEN` injected at pod start and refreshed by the CP every 4 minutes via a K8s Secret update. In OIDC environments (e.g. S0), `BOT_TOKEN` is an OIDC client-credentials JWT with a 15-minute TTL. + +This creates a three-way async race: + +1. CP ticker writes a fresh token to the Secret every 4 minutes +2. Kubelet propagates the Secret update to the pod's file mount (30–60s delay in busy clusters) +3. Runner reads the file mount on gRPC reconnect + +When the CP writes a token that is already close to expiry — because its in-memory `OIDCTokenProvider` cache had a short buffer — the runner reconnects with an already-expired token and enters an `UNAUTHENTICATED` loop. + +The fundamental issue is that the Secret-write model is an **async push** with no synchronization guarantee between when the token is written and when the runner reads it. + +### Solution + +The CP exposes a lightweight HTTP endpoint that runners call **synchronously on demand** to obtain a guaranteed-fresh token. This eliminates the async race entirely. + +``` +GET /token +``` + +- Served by a new `net/http` listener on the CP (port 8080, separate from any existing listener) +- Runner authenticates using its K8s service account token (mounted at `/var/run/secrets/kubernetes.io/serviceaccount/token`) — validated by the CP via the K8s `TokenReview` API +- CP calls `tokenProvider.Token(ctx)` at request time and returns the result — always fresh, always valid TTL +- Response: `{"token": "", "expires_at": ""}` + +### Authentication + +The runner's K8s SA token is a signed JWT issued by the K8s API server. The CP validates it using the K8s `authentication/v1` `TokenReview` resource: + +``` +POST /apis/authentication.k8s.io/v1/tokenreviews +{ + "spec": { "token": "" } +} +``` + +A successful `TokenReview` returns `status.authenticated=true` and `status.user.username` (e.g. `system:serviceaccount:ambient-code--myproject:session-abc123-sa`). The CP verifies the username prefix matches a known runner SA pattern before returning a token. + +This approach uses credentials already present in every pod — no new secrets required. + +### Token Bootstrap vs. Renewal + +| Phase | Mechanism | +|---|---| +| Initial startup | `BOT_TOKEN` env var / Secret file mount (injected by CP at pod creation) | +| gRPC reconnect | `GET /token` from CP endpoint — synchronous, guaranteed fresh | + +The Secret write loop is retained for initial bootstrap. For renewal, runners call the CP endpoint. + +### CP HTTP Server + +The CP adds a minimal `net/http` server alongside its existing K8s controller loop: + +```go +mux := http.NewServeMux() +mux.HandleFunc("/token", tokenHandler) +mux.HandleFunc("/healthz", healthHandler) +http.ListenAndServe(":8080", mux) +``` + +The server runs in a goroutine alongside `runKubeMode`. It shares the existing `tokenProvider` and `k8sClient` from the main CP config. + +### Runner Changes + +`_grpc_client.py` `reconnect()` is updated to call the CP token endpoint instead of re-reading the Secret file: + +```python +def reconnect(self) -> None: + fresh_token = _fetch_token_from_cp() # GET AMBIENT_CP_TOKEN_URL/token with SA token + self.close() + self._token = fresh_token +``` + +`AMBIENT_CP_TOKEN_URL` is injected by the CP as an env var when creating the runner pod. If not set (local dev / non-OIDC environments), the runner falls back to reading from the Secret file mount. + +### New CP Internal Packages + +| Package | Purpose | +|---|---| +| `internal/tokenserver/server.go` | HTTP server setup and graceful shutdown | +| `internal/tokenserver/handler.go` | `GET /token` handler — TokenReview validation + tokenProvider call | + +Status: 🔲 planned — RHOAIENG-56711 + +--- + ## Runner Credential Fetch The runner fetches provider credentials at session start before invoking Claude. Credentials are resolved by the CP and injected into the runner pod as `CREDENTIAL_IDS` — a JSON-encoded map of `provider → credential_id`: @@ -427,3 +518,6 @@ The `ambient-control-plane` ServiceAccount does not have `delete` on `namespaces | assistant payload → plain string | Symmetric with user payload; reasoning is observability data not conversation record | | GET /events is runner-local | Runner has the event queue; api-server proxies it; no second fan-out layer needed | | Namespace per project, not per session | Sessions within a project share a namespace; secrets and RBAC are project-scoped | +| CP token endpoint over Secret-write renewal | Secret writes are async push with no synchronization guarantee vs. token TTL; synchronous pull from CP eliminates the race entirely | +| Runner SA token for CP auth | K8s SA tokens are already mounted in every pod, long-lived, and K8s-managed — no new secrets or out-of-band key distribution required | +| BOT_TOKEN Secret retained for bootstrap | Avoids a chicken-and-egg: runner needs a token to make the first gRPC call before the CP endpoint can be contacted; Secret provides the initial credential | From 2a750173ad59d012021e965e7ce79660453e63d9 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Sat, 4 Apr 2026 14:57:34 -0400 Subject: [PATCH 2/5] =?UTF-8?q?docs(spec):=20remove=20BOT=5FTOKEN=20Secret?= =?UTF-8?q?=20=E2=80=94=20CP=20endpoint=20is=20sole=20token=20source?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CP creates the runner pod, so it is always up before the runner's first token request. Retaining a Secret write loop adds complexity and a second failure mode with identical blast radius. SA token automount flipped to true; BOT_TOKEN removed from env vars table and bootstrap split removed. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- docs/internal/design/control-plane.spec.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/internal/design/control-plane.spec.md b/docs/internal/design/control-plane.spec.md index 96ba6cd5c..412ecf1b1 100644 --- a/docs/internal/design/control-plane.spec.md +++ b/docs/internal/design/control-plane.spec.md @@ -83,7 +83,7 @@ The CP creates a Pod (not a Job) for each session. Key pod attributes: | `restartPolicy` | `Never` | Sessions are single-run; no automatic restart | | `imagePullPolicy` | `IfNotPresent` for `localhost/` images, `Always` otherwise | kind uses local containerd — `Always` breaks `localhost/` image pulls | | `serviceAccountName` | `session-{id}-sa` | Session-scoped; no cross-session access | -| `automountServiceAccountToken` | `false` | Runner uses BOT_TOKEN, not SA token | +| `automountServiceAccountToken` | `true` | Runner uses the SA token to authenticate to the CP token endpoint | | CPU request/limit | 500m / 2000m | Generous for Claude Code | | Memory request/limit | 512Mi / 4Gi | Claude Code is memory-intensive | @@ -109,7 +109,6 @@ Each section is joined with `\n\n`. Empty sections are omitted. If all four are | `WORKSPACE_PATH` | `/workspace` | Claude Code working directory | | `AGUI_PORT` | `8001` | Runner HTTP listener port | | `BACKEND_API_URL` | CP config | api-server base URL | -| `BOT_TOKEN` | from K8s secret | api-server bearer token (bootstrap only — see Token Endpoint) | | `AMBIENT_GRPC_URL` | CP config | api-server gRPC address | | `AMBIENT_GRPC_USE_TLS` | CP config | TLS flag for gRPC | | `AMBIENT_CP_TOKEN_URL` | CP config | CP token endpoint URL (e.g. `http://ambient-control-plane.{ns}.svc:8080/token`) | @@ -402,14 +401,16 @@ A successful `TokenReview` returns `status.authenticated=true` and `status.user. This approach uses credentials already present in every pod — no new secrets required. -### Token Bootstrap vs. Renewal +### Token Lifecycle + +The CP token endpoint is the **sole source** of the api-server bearer token for all runner pods. There is no Secret write loop and no `BOT_TOKEN` env var or file mount. | Phase | Mechanism | |---|---| -| Initial startup | `BOT_TOKEN` env var / Secret file mount (injected by CP at pod creation) | +| Initial startup | `GET /token` from CP endpoint — called in lifespan before gRPC channel opens | | gRPC reconnect | `GET /token` from CP endpoint — synchronous, guaranteed fresh | -The Secret write loop is retained for initial bootstrap. For renewal, runners call the CP endpoint. +The CP is critical infrastructure. It creates the runner pod, so it is running before the runner makes its first token request. If the CP is unreachable, the runner cannot function regardless (the CP is also responsible for all K8s provisioning). No fallback is needed or provided. ### CP HTTP Server @@ -435,7 +436,7 @@ def reconnect(self) -> None: self._token = fresh_token ``` -`AMBIENT_CP_TOKEN_URL` is injected by the CP as an env var when creating the runner pod. If not set (local dev / non-OIDC environments), the runner falls back to reading from the Secret file mount. +`AMBIENT_CP_TOKEN_URL` is injected by the CP as an env var when creating the runner pod. In local dev environments where the CP is not present, `BOT_TOKEN` env var may be set directly and the runner skips the CP endpoint call. ### New CP Internal Packages @@ -520,4 +521,4 @@ The `ambient-control-plane` ServiceAccount does not have `delete` on `namespaces | Namespace per project, not per session | Sessions within a project share a namespace; secrets and RBAC are project-scoped | | CP token endpoint over Secret-write renewal | Secret writes are async push with no synchronization guarantee vs. token TTL; synchronous pull from CP eliminates the race entirely | | Runner SA token for CP auth | K8s SA tokens are already mounted in every pod, long-lived, and K8s-managed — no new secrets or out-of-band key distribution required | -| BOT_TOKEN Secret retained for bootstrap | Avoids a chicken-and-egg: runner needs a token to make the first gRPC call before the CP endpoint can be contacted; Secret provides the initial credential | +| CP is sole token source — no BOT_TOKEN Secret | CP creates the runner pod, so it is always reachable before the runner's first token request; retaining a Secret adds complexity and a second failure mode with the same blast radius | From 660c0f74d7c26f6db150cbdc84563517a467b92b Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Sat, 4 Apr 2026 18:10:21 -0400 Subject: [PATCH 3/5] feat(control-plane): implement CP /token endpoint for runner gRPC auth RHOAIENG-56711 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminates the async BOT_TOKEN Secret push+refresh loop in favour of a synchronous pull model: - Add internal/tokenserver package: HTTP server on :8080 with GET /token handler that validates the caller's K8s SA token via TokenReview, checks the runner SA pattern (system:serviceaccount:*:session-*-sa), then mints and returns a fresh API token via the existing OIDCTokenProvider. - Wire token server goroutine into runKubeMode() alongside the informer. - Add CPTokenListenAddr / CPTokenURL fields to ControlPlaneConfig and KubeReconcilerConfig; read from CP_TOKEN_LISTEN_ADDR / CP_TOKEN_URL env. - kube_reconciler: remove ensureSecret, StartTokenRefreshLoop, refreshRunnerToken, refreshAllRunningTokens; replace BOT_TOKEN secret env injection with AMBIENT_CP_TOKEN_URL; set automountServiceAccountToken true on SA and pod so the K8s-mounted SA token is available at the standard path. - runner _grpc_client.py: add _fetch_token_from_cp() that reads the pod SA token and calls the CP /token endpoint; from_env() calls it when AMBIENT_CP_TOKEN_URL is set, falls back to BOT_TOKEN env var for local dev; reconnect() refreshes via CP endpoint on every gRPC reconnect. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../cmd/ambient-control-plane/main.go | 39 +++- components/ambient-control-plane/go.mod | 16 +- components/ambient-control-plane/go.sum | 20 +- .../internal/config/config.go | 4 + .../internal/reconciler/kube_reconciler.go | 190 ++---------------- .../internal/tokenserver/handler.go | 124 ++++++++++++ .../internal/tokenserver/server.go | 86 ++++++++ .../ambient_runner/_grpc_client.py | 57 ++++-- 8 files changed, 338 insertions(+), 198 deletions(-) create mode 100644 components/ambient-control-plane/internal/tokenserver/handler.go create mode 100644 components/ambient-control-plane/internal/tokenserver/server.go diff --git a/components/ambient-control-plane/cmd/ambient-control-plane/main.go b/components/ambient-control-plane/cmd/ambient-control-plane/main.go index 69bed8d52..ece7860d0 100644 --- a/components/ambient-control-plane/cmd/ambient-control-plane/main.go +++ b/components/ambient-control-plane/cmd/ambient-control-plane/main.go @@ -17,6 +17,7 @@ import ( "github.com/ambient-code/platform/components/ambient-control-plane/internal/informer" "github.com/ambient-code/platform/components/ambient-control-plane/internal/kubeclient" "github.com/ambient-code/platform/components/ambient-control-plane/internal/reconciler" + "github.com/ambient-code/platform/components/ambient-control-plane/internal/tokenserver" "github.com/ambient-code/platform/components/ambient-control-plane/internal/watcher" sdkclient "github.com/ambient-code/platform/components/ambient-sdk/go-sdk/client" "github.com/rs/zerolog" @@ -24,6 +25,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) var ( @@ -138,6 +141,7 @@ func runKubeMode(ctx context.Context, cfg *config.ControlPlaneConfig) error { MCPAPIServerURL: cfg.MCPAPIServerURL, RunnerLogLevel: cfg.RunnerLogLevel, CPRuntimeNamespace: cfg.CPRuntimeNamespace, + CPTokenURL: cfg.CPTokenURL, } conn, err := grpc.NewClient(cfg.GRPCServerAddr, grpc.WithTransportCredentials(grpcCredentials(cfg.GRPCUseTLS))) @@ -176,12 +180,41 @@ func runKubeMode(ctx context.Context, cfg *config.ControlPlaneConfig) error { sessionReconcilers := createSessionReconcilers(cfg.Reconcilers, factory, kube, projectKube, provisioner, kubeReconcilerCfg, log.Logger) for _, sessionRec := range sessionReconcilers { inf.RegisterHandler("sessions", sessionRec.Reconcile) - if kr, ok := sessionRec.(*reconciler.SimpleKubeReconciler); ok { - kr.StartTokenRefreshLoop(ctx) + } + + errCh := make(chan error, 1) + go func() { + if err := startTokenServer(ctx, cfg, tokenProvider); err != nil { + errCh <- err } + }() + + infErr := inf.Run(ctx) + select { + case tsErr := <-errCh: + return fmt.Errorf("token server: %w", tsErr) + default: } + return infErr +} - return inf.Run(ctx) +func startTokenServer(ctx context.Context, cfg *config.ControlPlaneConfig, tokenProvider auth.TokenProvider) error { + k8sConfig, err := buildK8sRestConfig(cfg.Kubeconfig) + if err != nil { + return fmt.Errorf("building k8s rest config for token server: %w", err) + } + ts, err := tokenserver.New(cfg.CPTokenListenAddr, tokenProvider, k8sConfig, log.Logger) + if err != nil { + return fmt.Errorf("creating token server: %w", err) + } + return ts.Start(ctx) +} + +func buildK8sRestConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() } func createSessionReconcilers(reconcilerTypes []string, factory *reconciler.SDKClientFactory, kube *kubeclient.KubeClient, projectKube *kubeclient.KubeClient, provisioner kubeclient.NamespaceProvisioner, cfg reconciler.KubeReconcilerConfig, logger zerolog.Logger) []reconciler.Reconciler { diff --git a/components/ambient-control-plane/go.mod b/components/ambient-control-plane/go.mod index ec6bb3a80..d576c1c7e 100644 --- a/components/ambient-control-plane/go.mod +++ b/components/ambient-control-plane/go.mod @@ -10,25 +10,35 @@ require ( github.com/rs/zerolog v1.34.0 golang.org/x/oauth2 v0.34.0 google.golang.org/grpc v1.79.1 + k8s.io/api v0.34.0 k8s.io/apimachinery v0.34.0 k8s.io/client-go v0.34.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/go-restful/v3 v3.12.2 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/gnostic-models v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/x448/float16 v0.8.4 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/term v0.38.0 // indirect @@ -36,9 +46,11 @@ require ( golang.org/x/time v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/protobuf v1.36.11 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect diff --git a/components/ambient-control-plane/go.sum b/components/ambient-control-plane/go.sum index 1a2aef3da..ab57feee0 100644 --- a/components/ambient-control-plane/go.sum +++ b/components/ambient-control-plane/go.sum @@ -13,12 +13,16 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= +github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -29,6 +33,8 @@ github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7O github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -59,12 +65,14 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= +github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= +github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= @@ -73,9 +81,14 @@ github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6 github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -134,6 +147,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ= +golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -153,6 +168,7 @@ gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSP gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE= diff --git a/components/ambient-control-plane/internal/config/config.go b/components/ambient-control-plane/internal/config/config.go index cfd0852d1..08997d882 100644 --- a/components/ambient-control-plane/internal/config/config.go +++ b/components/ambient-control-plane/internal/config/config.go @@ -37,6 +37,8 @@ type ControlPlaneConfig struct { MCPAPIServerURL string RunnerLogLevel string ProjectKubeTokenFile string + CPTokenListenAddr string + CPTokenURL string } func Load() (*ControlPlaneConfig, error) { @@ -71,6 +73,8 @@ func Load() (*ControlPlaneConfig, error) { MCPAPIServerURL: envOrDefault("MCP_API_SERVER_URL", "http://ambient-api-server.ambient-code.svc:8000"), RunnerLogLevel: envOrDefault("RUNNER_LOG_LEVEL", "info"), ProjectKubeTokenFile: os.Getenv("PROJECT_KUBE_TOKEN_FILE"), + CPTokenListenAddr: envOrDefault("CP_TOKEN_LISTEN_ADDR", ":8080"), + CPTokenURL: os.Getenv("CP_TOKEN_URL"), } if cfg.APIToken == "" && (cfg.OIDCClientID == "" || cfg.OIDCClientSecret == "") { diff --git a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go index 3a348ebc8..395e5d4dd 100644 --- a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go +++ b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go @@ -19,12 +19,6 @@ import ( const ( mcpSidecarPort = int64(8090) mcpSidecarURL = "http://localhost:8090" - - runnerTokenMountPath = "/var/run/secrets/ambient" - runnerTokenFileName = "bot-token" - runnerTokenVolumeKey = "api-token" - runnerTokenVolumeName = "runner-token" - runnerTokenRefreshEvery = 4 * time.Minute ) type KubeReconcilerConfig struct { @@ -44,6 +38,7 @@ type KubeReconcilerConfig struct { MCPAPIServerURL string RunnerLogLevel string CPRuntimeNamespace string + CPTokenURL string } type SimpleKubeReconciler struct { @@ -142,10 +137,6 @@ func (r *SimpleKubeReconciler) provisionSession(ctx context.Context, session typ sessionLabel := sessionLabelSelector(session.ID) - if err := r.ensureSecret(ctx, namespace, session, sessionLabel); err != nil { - return fmt.Errorf("ensuring secret: %w", err) - } - if r.cfg.VertexEnabled { if err := r.ensureVertexSecret(ctx, namespace); err != nil { return fmt.Errorf("ensuring vertex secret: %w", err) @@ -309,41 +300,6 @@ func (r *SimpleKubeReconciler) ensureImagePullAccess(ctx context.Context, namesp return nil } -func (r *SimpleKubeReconciler) ensureSecret(ctx context.Context, namespace string, session types.Session, labelSelector string) error { - name := secretName(session.ID) - - if _, err := r.nsKube().GetSecret(ctx, namespace, name); err == nil { - return nil - } - - token, err := r.factory.Token(ctx) - if err != nil { - return fmt.Errorf("resolving API token: %w", err) - } - - secret := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Secret", - "metadata": map[string]interface{}{ - "name": name, - "namespace": namespace, - "labels": sessionLabels(session.ID, session.ProjectID), - }, - "stringData": map[string]interface{}{ - "api-token": token, - }, - }, - } - - if _, err := r.nsKube().CreateSecret(ctx, secret); err != nil && !k8serrors.IsAlreadyExists(err) { - return fmt.Errorf("creating secret %s: %w", name, err) - } - - r.logger.Debug().Str("secret", name).Str("namespace", namespace).Msg("secret created") - return nil -} - func (r *SimpleKubeReconciler) ensureServiceAccount(ctx context.Context, namespace string, session types.Session, labelSelector string) error { name := serviceAccountName(session.ID) @@ -360,7 +316,7 @@ func (r *SimpleKubeReconciler) ensureServiceAccount(ctx context.Context, namespa "namespace": namespace, "labels": sessionLabels(session.ID, session.ProjectID), }, - "automountServiceAccountToken": false, + "automountServiceAccountToken": true, }, } @@ -381,7 +337,6 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } saName := serviceAccountName(session.ID) - secretName := secretName(session.ID) runnerImage := r.cfg.RunnerImage imagePullPolicy := "Always" @@ -404,8 +359,8 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, "protocol": "TCP", }, }, - "volumeMounts": r.buildVolumeMounts(secretName), - "env": r.buildEnv(ctx, session, sdk, secretName, useMCPSidecar, credentialIDs), + "volumeMounts": r.buildVolumeMounts(), + "env": r.buildEnv(ctx, session, sdk, useMCPSidecar, credentialIDs), "resources": map[string]interface{}{ "requests": map[string]interface{}{ "cpu": "500m", @@ -426,7 +381,7 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } if useMCPSidecar { - containers = append(containers, r.buildMCPSidecar(secretName)) + containers = append(containers, r.buildMCPSidecar()) r.logger.Info().Str("session_id", session.ID).Msg("MCP sidecar enabled for session") } @@ -445,10 +400,10 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, }, "spec": map[string]interface{}{ "serviceAccountName": saName, - "automountServiceAccountToken": false, + "automountServiceAccountToken": true, "restartPolicy": "Never", "terminationGracePeriodSeconds": int64(60), - "volumes": r.buildVolumes(secretName), + "volumes": r.buildVolumes(), "containers": containers, }, }, @@ -462,7 +417,7 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, return nil } -func (r *SimpleKubeReconciler) buildVolumes(credSecretName string) []interface{} { +func (r *SimpleKubeReconciler) buildVolumes() []interface{} { vols := []interface{}{ map[string]interface{}{ "name": "workspace", @@ -475,18 +430,6 @@ func (r *SimpleKubeReconciler) buildVolumes(credSecretName string) []interface{} "optional": true, }, }, - map[string]interface{}{ - "name": runnerTokenVolumeName, - "secret": map[string]interface{}{ - "secretName": credSecretName, - "items": []interface{}{ - map[string]interface{}{ - "key": runnerTokenVolumeKey, - "path": runnerTokenFileName, - }, - }, - }, - }, } if r.cfg.VertexEnabled { vols = append(vols, map[string]interface{}{ @@ -499,7 +442,7 @@ func (r *SimpleKubeReconciler) buildVolumes(credSecretName string) []interface{} return vols } -func (r *SimpleKubeReconciler) buildVolumeMounts(_ string) []interface{} { +func (r *SimpleKubeReconciler) buildVolumeMounts() []interface{} { mounts := []interface{}{ map[string]interface{}{ "name": "workspace", @@ -511,11 +454,6 @@ func (r *SimpleKubeReconciler) buildVolumeMounts(_ string) []interface{} { "subPath": "service-ca.crt", "readOnly": true, }, - map[string]interface{}{ - "name": runnerTokenVolumeName, - "mountPath": runnerTokenMountPath, - "readOnly": true, - }, } if r.cfg.VertexEnabled { mounts = append(mounts, map[string]interface{}{ @@ -564,7 +502,7 @@ func (r *SimpleKubeReconciler) ensureVertexSecret(ctx context.Context, namespace return nil } -func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Session, sdk *sdkclient.Client, credSecretName string, useMCPSidecar bool, credentialIDs map[string]string) []interface{} { +func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Session, sdk *sdkclient.Client, useMCPSidecar bool, credentialIDs map[string]string) []interface{} { useVertex := "0" if r.cfg.VertexEnabled { useVertex = "1" @@ -584,7 +522,7 @@ func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Sessi envVar("BACKEND_API_URL", r.cfg.BackendURL), envVar("USE_VERTEX", useVertex), envVar("CLAUDE_CODE_USE_VERTEX", useVertex), - envVarFromSecret("BOT_TOKEN", credSecretName, "api-token"), + envVar("AMBIENT_CP_TOKEN_URL", r.cfg.CPTokenURL), envVar("AMBIENT_GRPC_URL", r.cfg.RunnerGRPCURL), envVar("AMBIENT_GRPC_ENABLED", boolToStr(r.cfg.RunnerGRPCURL != "")), envVar("AMBIENT_GRPC_USE_TLS", boolToStr(r.cfg.RunnerGRPCUseTLS)), @@ -797,10 +735,6 @@ func podName(sessionID string) string { return fmt.Sprintf("session-%s-runner", safeResourceName(sessionID)) } -func secretName(sessionID string) string { - return fmt.Sprintf("session-%s-creds", safeResourceName(sessionID)) -} - func serviceAccountName(sessionID string) string { return fmt.Sprintf("session-%s-sa", safeResourceName(sessionID)) } @@ -816,7 +750,7 @@ func boolToStr(b bool) string { return "false" } -func (r *SimpleKubeReconciler) buildMCPSidecar(credSecretName string) interface{} { +func (r *SimpleKubeReconciler) buildMCPSidecar() interface{} { mcpImage := r.cfg.MCPImage imagePullPolicy := "Always" if strings.HasPrefix(mcpImage, "localhost/") { @@ -837,7 +771,7 @@ func (r *SimpleKubeReconciler) buildMCPSidecar(credSecretName string) interface{ envVar("MCP_TRANSPORT", "sse"), envVar("MCP_BIND_ADDR", fmt.Sprintf(":%d", mcpSidecarPort)), envVar("AMBIENT_API_URL", r.cfg.MCPAPIServerURL), - envVarFromSecret("AMBIENT_TOKEN", credSecretName, "api-token"), + envVar("AMBIENT_CP_TOKEN_URL", r.cfg.CPTokenURL), }, "resources": map[string]interface{}{ "requests": map[string]interface{}{ @@ -858,18 +792,6 @@ func (r *SimpleKubeReconciler) buildMCPSidecar(credSecretName string) interface{ } } -func envVarFromSecret(name, secretName, key string) interface{} { - return map[string]interface{}{ - "name": name, - "valueFrom": map[string]interface{}{ - "secretKeyRef": map[string]interface{}{ - "name": secretName, - "key": key, - }, - }, - } -} - func min(a, b int) int { if a < b { return a @@ -877,89 +799,3 @@ func min(a, b int) int { return b } -func (r *SimpleKubeReconciler) refreshRunnerToken(ctx context.Context, namespace, sessionID string) error { - name := secretName(sessionID) - existing, err := r.nsKube().GetSecret(ctx, namespace, name) - if err != nil { - return fmt.Errorf("getting secret %s: %w", name, err) - } - - token, err := r.factory.Token(ctx) - if err != nil { - return fmt.Errorf("resolving fresh API token: %w", err) - } - - updated := existing.DeepCopy() - if err := unstructured.SetNestedField(updated.Object, map[string]interface{}{"api-token": token}, "stringData"); err != nil { - return fmt.Errorf("setting stringData on secret %s: %w", name, err) - } - - if _, err := r.nsKube().UpdateSecret(ctx, updated); err != nil { - return fmt.Errorf("updating secret %s: %w", name, err) - } - - r.logger.Info().Str("secret", name).Str("namespace", namespace).Msg("runner token refreshed") - return nil -} - -func (r *SimpleKubeReconciler) StartTokenRefreshLoop(ctx context.Context) { - go func() { - r.refreshAllRunningTokens(ctx) - ticker := time.NewTicker(runnerTokenRefreshEvery) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - r.refreshAllRunningTokens(ctx) - } - } - }() -} - -func (r *SimpleKubeReconciler) refreshAllRunningTokens(ctx context.Context) { - projectOpts := &types.ListOptions{Page: 1, Size: 100} - for { - projectSDK, err := r.factory.ForProject(ctx, "_") - if err != nil { - r.logger.Warn().Err(err).Msg("token refresh loop: failed to get SDK client") - return - } - projectList, err := projectSDK.Projects().List(ctx, projectOpts) - if err != nil { - r.logger.Warn().Err(err).Int("page", projectOpts.Page).Msg("token refresh loop: failed to list projects") - return - } - for _, project := range projectList.Items { - sdk, err := r.factory.ForProject(ctx, project.ID) - if err != nil { - r.logger.Warn().Err(err).Str("project_id", project.ID).Msg("token refresh loop: failed to get SDK client for project") - continue - } - sessionOpts := &types.ListOptions{Page: 1, Size: 100, Search: "phase = 'Running'"} - for { - list, err := sdk.Sessions().List(ctx, sessionOpts) - if err != nil { - r.logger.Warn().Err(err).Str("project_id", project.ID).Int("page", sessionOpts.Page).Msg("token refresh loop: failed to list running sessions") - break - } - for i := range list.Items { - session := list.Items[i] - namespace := r.namespaceForSession(session) - if err := r.refreshRunnerToken(ctx, namespace, session.ID); err != nil { - r.logger.Warn().Err(err).Str("session_id", session.ID).Str("namespace", namespace).Msg("token refresh loop: failed to refresh token") - } - } - if len(list.Items) == 0 || list.Total <= sessionOpts.Page*sessionOpts.Size { - break - } - sessionOpts.Page++ - } - } - if len(projectList.Items) == 0 || projectList.Total <= projectOpts.Page*projectOpts.Size { - break - } - projectOpts.Page++ - } -} diff --git a/components/ambient-control-plane/internal/tokenserver/handler.go b/components/ambient-control-plane/internal/tokenserver/handler.go new file mode 100644 index 000000000..1afc08f4e --- /dev/null +++ b/components/ambient-control-plane/internal/tokenserver/handler.go @@ -0,0 +1,124 @@ +package tokenserver + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/ambient-code/platform/components/ambient-control-plane/internal/auth" + "github.com/rs/zerolog" + authv1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +const ( + runnerSAPrefix = "system:serviceaccount:" + sessionSAInfix = ":session-" + sessionSASuffix = "-sa" + tokenReviewTimeout = 10 * time.Second +) + +type tokenResponse struct { + Token string `json:"token"` + ExpiresAt string `json:"expires_at,omitempty"` +} + +type handler struct { + tokenProvider auth.TokenProvider + k8sClient kubernetes.Interface + logger zerolog.Logger +} + +func (h *handler) handleToken(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + saToken, err := extractBearerToken(r) + if err != nil { + h.logger.Warn().Err(err).Msg("token request: missing or malformed Authorization header") + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + username, err := h.validateSAToken(r.Context(), saToken) + if err != nil { + h.logger.Warn().Err(err).Msg("token request: SA token validation failed") + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + if !isRunnerSA(username) { + h.logger.Warn().Str("username", username).Msg("token request: username does not match runner SA pattern") + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + apiToken, err := h.tokenProvider.Token(r.Context()) + if err != nil { + h.logger.Error().Err(err).Str("username", username).Msg("token request: failed to mint API token") + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + + h.logger.Info().Str("username", username).Msg("token request: issued fresh API token") + + resp := tokenResponse{Token: apiToken} + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + h.logger.Warn().Err(err).Msg("token request: failed to write response") + } +} + +func (h *handler) validateSAToken(ctx context.Context, token string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, tokenReviewTimeout) + defer cancel() + + tr := &authv1.TokenReview{ + Spec: authv1.TokenReviewSpec{ + Token: token, + }, + } + + result, err := h.k8sClient.AuthenticationV1().TokenReviews().Create(ctx, tr, metav1.CreateOptions{}) + if err != nil { + return "", fmt.Errorf("TokenReview API call failed: %w", err) + } + if !result.Status.Authenticated { + return "", fmt.Errorf("token not authenticated: %s", result.Status.Error) + } + + return result.Status.User.Username, nil +} + +func isRunnerSA(username string) bool { + if !strings.HasPrefix(username, runnerSAPrefix) { + return false + } + rest := strings.TrimPrefix(username, runnerSAPrefix) + idx := strings.Index(rest, sessionSAInfix) + if idx < 0 { + return false + } + return strings.HasSuffix(rest, sessionSASuffix) +} + +func extractBearerToken(r *http.Request) (string, error) { + auth := r.Header.Get("Authorization") + if auth == "" { + return "", fmt.Errorf("Authorization header missing") + } + if !strings.HasPrefix(auth, "Bearer ") { + return "", fmt.Errorf("Authorization header must use Bearer scheme") + } + token := strings.TrimPrefix(auth, "Bearer ") + if token == "" { + return "", fmt.Errorf("empty bearer token") + } + return token, nil +} diff --git a/components/ambient-control-plane/internal/tokenserver/server.go b/components/ambient-control-plane/internal/tokenserver/server.go new file mode 100644 index 000000000..25f5f3ce5 --- /dev/null +++ b/components/ambient-control-plane/internal/tokenserver/server.go @@ -0,0 +1,86 @@ +package tokenserver + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/ambient-code/platform/components/ambient-control-plane/internal/auth" + "github.com/rs/zerolog" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const ( + DefaultListenAddr = ":8080" + readTimeout = 10 * time.Second + writeTimeout = 10 * time.Second + idleTimeout = 60 * time.Second + shutdownGracePeriod = 5 * time.Second +) + +type Server struct { + srv *http.Server + logger zerolog.Logger +} + +func New( + listenAddr string, + tokenProvider auth.TokenProvider, + k8sConfig *rest.Config, + logger zerolog.Logger, +) (*Server, error) { + k8sClient, err := kubernetes.NewForConfig(k8sConfig) + if err != nil { + return nil, fmt.Errorf("creating k8s client for token server: %w", err) + } + + h := &handler{ + tokenProvider: tokenProvider, + k8sClient: k8sClient, + logger: logger.With().Str("component", "tokenserver").Logger(), + } + + mux := http.NewServeMux() + mux.HandleFunc("/token", h.handleToken) + mux.HandleFunc("/healthz", handleHealthz) + + return &Server{ + srv: &http.Server{ + Addr: listenAddr, + Handler: mux, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + IdleTimeout: idleTimeout, + }, + logger: logger.With().Str("component", "tokenserver").Logger(), + }, nil +} + +func (s *Server) Start(ctx context.Context) error { + errCh := make(chan error, 1) + go func() { + s.logger.Info().Str("addr", s.srv.Addr).Msg("token server listening") + if err := s.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errCh <- err + } + }() + + select { + case <-ctx.Done(): + shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownGracePeriod) + defer cancel() + if err := s.srv.Shutdown(shutdownCtx); err != nil { + s.logger.Warn().Err(err).Msg("token server shutdown error") + } + return nil + case err := <-errCh: + return fmt.Errorf("token server: %w", err) + } +} + +func handleHealthz(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) +} diff --git a/components/runners/ambient-runner/ambient_runner/_grpc_client.py b/components/runners/ambient-runner/ambient_runner/_grpc_client.py index 0a975ac58..0a218e918 100644 --- a/components/runners/ambient-runner/ambient_runner/_grpc_client.py +++ b/components/runners/ambient-runner/ambient_runner/_grpc_client.py @@ -1,7 +1,9 @@ from __future__ import annotations +import json import logging import os +import urllib.request from pathlib import Path from typing import Optional @@ -11,23 +13,33 @@ _ENV_GRPC_URL = "AMBIENT_GRPC_URL" _ENV_TOKEN = "BOT_TOKEN" +_ENV_CP_TOKEN_URL = "AMBIENT_CP_TOKEN_URL" _ENV_USE_TLS = "AMBIENT_GRPC_USE_TLS" _ENV_CA_CERT = "AMBIENT_GRPC_CA_CERT_FILE" _DEFAULT_GRPC_URL = "ambient-api-server:9000" _SERVICE_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt" -_BOT_TOKEN_FILE = Path("/var/run/secrets/ambient/bot-token") +_SA_TOKEN_FILE = Path("/var/run/secrets/kubernetes.io/serviceaccount/token") -def _read_current_token() -> str: - """Read the bot token, preferring the kubelet-rotated file mount over env var.""" +def _fetch_token_from_cp(cp_token_url: str) -> str: + """Fetch a fresh API token from the CP /token endpoint using the pod SA token.""" try: - if _BOT_TOKEN_FILE.exists(): - token = _BOT_TOKEN_FILE.read_text().strip() - if token: - return token - except OSError: - pass - return os.environ.get(_ENV_TOKEN, "") + sa_token = _SA_TOKEN_FILE.read_text().strip() + except OSError as e: + raise RuntimeError(f"cannot read SA token from {_SA_TOKEN_FILE}: {e}") from e + + req = urllib.request.Request( + cp_token_url, + headers={"Authorization": f"Bearer {sa_token}"}, + ) + with urllib.request.urlopen(req, timeout=10) as resp: + body = json.loads(resp.read()) + + token = body.get("token", "") + if not token: + raise RuntimeError("CP /token response missing 'token' field") + logger.info("[GRPC CLIENT] Fetched fresh API token from CP token endpoint") + return token def _load_ca_cert(ca_cert_file: Optional[str]) -> Optional[bytes]: @@ -82,11 +94,13 @@ def __init__( token: str, use_tls: bool = False, ca_cert_file: Optional[str] = None, + cp_token_url: str = "", ) -> None: self._grpc_url = grpc_url self._token = token self._use_tls = use_tls self._ca_cert_file = ca_cert_file + self._cp_token_url = cp_token_url self._channel: Optional[grpc.Channel] = None self._session_messages: Optional["SessionMessagesAPI"] = None # noqa: F821 @@ -94,9 +108,17 @@ def __init__( def from_env(cls) -> AmbientGRPCClient: """Create client from environment variables.""" grpc_url = os.environ.get(_ENV_GRPC_URL, _DEFAULT_GRPC_URL) - token = _read_current_token() + cp_token_url = os.environ.get(_ENV_CP_TOKEN_URL, "") use_tls = os.environ.get(_ENV_USE_TLS, "").lower() in ("true", "1", "yes") ca_cert_file = os.environ.get(_ENV_CA_CERT) + if cp_token_url: + logger.info( + "[GRPC CLIENT] Fetching token from CP endpoint: url=%s", cp_token_url + ) + token = _fetch_token_from_cp(cp_token_url) + else: + token = os.environ.get(_ENV_TOKEN, "") + logger.info("[GRPC CLIENT] Using BOT_TOKEN env var (local dev mode)") logger.info( "[GRPC CLIENT] Initializing from env: url=%s tls=%s token_len=%d", grpc_url, @@ -104,12 +126,19 @@ def from_env(cls) -> AmbientGRPCClient: len(token), ) return cls( - grpc_url=grpc_url, token=token, use_tls=use_tls, ca_cert_file=ca_cert_file + grpc_url=grpc_url, + token=token, + use_tls=use_tls, + ca_cert_file=ca_cert_file, + cp_token_url=cp_token_url, ) def reconnect(self) -> None: - """Close the existing channel and rebuild with a fresh token from the file mount.""" - fresh_token = _read_current_token() + """Close the existing channel and rebuild with a fresh token from the CP endpoint.""" + if self._cp_token_url: + fresh_token = _fetch_token_from_cp(self._cp_token_url) + else: + fresh_token = os.environ.get(_ENV_TOKEN, "") logger.info( "[GRPC CLIENT] Reconnecting with fresh token (len=%d)", len(fresh_token) ) From 8833ccebcc80ffc64e1763fe6f6068fb42d8602f Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Sat, 4 Apr 2026 19:01:55 -0400 Subject: [PATCH 4/5] fix(control-plane): create NetworkPolicy allowing api-server to reach runner pods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Session namespace NetworkPolicies (provisioned by the MPP tenant operator) block cross-namespace ingress by default. The api-server pod (in the CP runtime namespace) could not reach runner pod port 8001, causing `acpctl session events` to receive a 502 from the ingress. Add ensureAPIServerNetworkPolicy() called from ensureNamespaceExists(): creates a NetworkPolicy named allow-ambient-api-server in each session namespace that allows TCP:8001 ingress from pods in the CP runtime namespace (matched by kubernetes.io/metadata.name label). Idempotent — skips creation if the policy already exists. Also add NetworkPolicyGVR, GetNetworkPolicy, and CreateNetworkPolicy to KubeClient to support the new operation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../internal/kubeclient/kubeclient.go | 14 +++++ .../internal/reconciler/kube_reconciler.go | 59 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/components/ambient-control-plane/internal/kubeclient/kubeclient.go b/components/ambient-control-plane/internal/kubeclient/kubeclient.go index 5681c258a..9f96289bc 100644 --- a/components/ambient-control-plane/internal/kubeclient/kubeclient.go +++ b/components/ambient-control-plane/internal/kubeclient/kubeclient.go @@ -57,6 +57,12 @@ var RoleGVR = schema.GroupVersionResource{ Resource: "roles", } +var NetworkPolicyGVR = schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "networkpolicies", +} + type KubeClient struct { dynamic dynamic.Interface logger zerolog.Logger @@ -258,6 +264,14 @@ func (kc *KubeClient) DeleteRoleBindingsByLabel(ctx context.Context, namespace, return kc.dynamic.Resource(RoleBindingGVR).Namespace(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: labelSelector}) } +func (kc *KubeClient) GetNetworkPolicy(ctx context.Context, namespace, name string) (*unstructured.Unstructured, error) { + return kc.dynamic.Resource(NetworkPolicyGVR).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) +} + +func (kc *KubeClient) CreateNetworkPolicy(ctx context.Context, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + return kc.dynamic.Resource(NetworkPolicyGVR).Namespace(obj.GetNamespace()).Create(ctx, obj, metav1.CreateOptions{}) +} + func (kc *KubeClient) GetResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) { return kc.dynamic.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) } diff --git a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go index 395e5d4dd..e2053b3cd 100644 --- a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go +++ b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go @@ -266,6 +266,65 @@ func (r *SimpleKubeReconciler) ensureNamespaceExists(ctx context.Context, namesp } } + if r.cfg.CPRuntimeNamespace != "" { + if err := r.ensureAPIServerNetworkPolicy(ctx, namespace); err != nil { + r.logger.Warn().Err(err).Str("namespace", namespace).Msg("failed to ensure api-server network policy") + } + } + + return nil +} + +func (r *SimpleKubeReconciler) ensureAPIServerNetworkPolicy(ctx context.Context, namespace string) error { + name := "allow-ambient-api-server" + + if _, err := r.nsKube().GetNetworkPolicy(ctx, namespace, name); err == nil { + return nil + } + + np := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "networking.k8s.io/v1", + "kind": "NetworkPolicy", + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + "labels": map[string]interface{}{ + LabelManaged: "true", + LabelManagedBy: "ambient-control-plane", + }, + }, + "spec": map[string]interface{}{ + "podSelector": map[string]interface{}{}, + "ingress": []interface{}{ + map[string]interface{}{ + "from": []interface{}{ + map[string]interface{}{ + "namespaceSelector": map[string]interface{}{ + "matchLabels": map[string]interface{}{ + "kubernetes.io/metadata.name": r.cfg.CPRuntimeNamespace, + }, + }, + }, + }, + "ports": []interface{}{ + map[string]interface{}{ + "protocol": "TCP", + "port": int64(8001), + }, + }, + }, + }, + "policyTypes": []interface{}{"Ingress"}, + }, + }, + } + + if _, err := r.nsKube().CreateNetworkPolicy(ctx, np); err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("creating network policy %s in %s: %w", name, namespace, err) + } + + r.logger.Debug().Str("namespace", namespace).Str("policy", name).Msg("api-server network policy created") return nil } From 42092fe6339ab70cbcd932c7a95a88bfbce8977c Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Sat, 4 Apr 2026 19:16:43 -0400 Subject: [PATCH 5/5] fix(control-plane,runner): address CodeRabbit review comments on PR #1213 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit main.go: run informer and token server concurrently in separate goroutines and select on both error channels — a token server crash now immediately fails runKubeMode instead of being silently swallowed after inf.Run returns. tokenserver/handler.go: remove dead expires_at field from tokenResponse; OIDCTokenProvider does not expose expiry metadata and the runner never read it. _grpc_client.py: - Validate AMBIENT_CP_TOKEN_URL scheme (http/https only, no credentials) before sending the pod SA token, preventing exfiltration via misconfigured or malicious URLs (file://, ftp://, http://user:pass@...). - Add exponential backoff retry (attempts: 1, backoff: 1s/2s) around _fetch_token_from_cp() to handle transient CP unavailability at runner startup or gRPC reconnect. - Catch urllib.error.HTTPError separately and include the response body in the RuntimeError message for debuggability. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../cmd/ambient-control-plane/main.go | 23 +++--- .../internal/tokenserver/handler.go | 3 +- .../ambient_runner/_grpc_client.py | 78 +++++++++++++++---- 3 files changed, 80 insertions(+), 24 deletions(-) diff --git a/components/ambient-control-plane/cmd/ambient-control-plane/main.go b/components/ambient-control-plane/cmd/ambient-control-plane/main.go index ece7860d0..8614aa273 100644 --- a/components/ambient-control-plane/cmd/ambient-control-plane/main.go +++ b/components/ambient-control-plane/cmd/ambient-control-plane/main.go @@ -182,20 +182,25 @@ func runKubeMode(ctx context.Context, cfg *config.ControlPlaneConfig) error { inf.RegisterHandler("sessions", sessionRec.Reconcile) } - errCh := make(chan error, 1) + tsErrCh := make(chan error, 1) go func() { - if err := startTokenServer(ctx, cfg, tokenProvider); err != nil { - errCh <- err - } + tsErrCh <- startTokenServer(ctx, cfg, tokenProvider) + }() + + infErrCh := make(chan error, 1) + go func() { + infErrCh <- inf.Run(ctx) }() - infErr := inf.Run(ctx) select { - case tsErr := <-errCh: - return fmt.Errorf("token server: %w", tsErr) - default: + case tsErr := <-tsErrCh: + if tsErr != nil { + return fmt.Errorf("token server: %w", tsErr) + } + return <-infErrCh + case infErr := <-infErrCh: + return infErr } - return infErr } func startTokenServer(ctx context.Context, cfg *config.ControlPlaneConfig, tokenProvider auth.TokenProvider) error { diff --git a/components/ambient-control-plane/internal/tokenserver/handler.go b/components/ambient-control-plane/internal/tokenserver/handler.go index 1afc08f4e..fd0bc721a 100644 --- a/components/ambient-control-plane/internal/tokenserver/handler.go +++ b/components/ambient-control-plane/internal/tokenserver/handler.go @@ -23,8 +23,7 @@ const ( ) type tokenResponse struct { - Token string `json:"token"` - ExpiresAt string `json:"expires_at,omitempty"` + Token string `json:"token"` } type handler struct { diff --git a/components/runners/ambient-runner/ambient_runner/_grpc_client.py b/components/runners/ambient-runner/ambient_runner/_grpc_client.py index 0a218e918..a46662bf6 100644 --- a/components/runners/ambient-runner/ambient_runner/_grpc_client.py +++ b/components/runners/ambient-runner/ambient_runner/_grpc_client.py @@ -3,6 +3,9 @@ import json import logging import os +import time +import urllib.error +import urllib.parse import urllib.request from pathlib import Path from typing import Optional @@ -21,25 +24,74 @@ _SA_TOKEN_FILE = Path("/var/run/secrets/kubernetes.io/serviceaccount/token") +_CP_TOKEN_FETCH_ATTEMPTS = 3 +_CP_TOKEN_FETCH_TIMEOUT = 10 + + +def _validate_cp_token_url(url: str) -> None: + """Reject non-http(s) or credential-bearing URLs to prevent SA token exfiltration.""" + parsed = urllib.parse.urlparse(url) + if ( + parsed.scheme not in {"http", "https"} + or not parsed.netloc + or parsed.username is not None + or parsed.password is not None + ): + raise RuntimeError( + f"invalid CP token URL (must be http/https with no credentials): {url!r}" + ) + + def _fetch_token_from_cp(cp_token_url: str) -> str: - """Fetch a fresh API token from the CP /token endpoint using the pod SA token.""" + """Fetch a fresh API token from the CP /token endpoint using the pod SA token. + + Retries up to _CP_TOKEN_FETCH_ATTEMPTS times with exponential backoff + to handle transient CP unavailability. + """ + _validate_cp_token_url(cp_token_url) + try: sa_token = _SA_TOKEN_FILE.read_text().strip() except OSError as e: raise RuntimeError(f"cannot read SA token from {_SA_TOKEN_FILE}: {e}") from e - req = urllib.request.Request( - cp_token_url, - headers={"Authorization": f"Bearer {sa_token}"}, - ) - with urllib.request.urlopen(req, timeout=10) as resp: - body = json.loads(resp.read()) - - token = body.get("token", "") - if not token: - raise RuntimeError("CP /token response missing 'token' field") - logger.info("[GRPC CLIENT] Fetched fresh API token from CP token endpoint") - return token + last_err: Exception = RuntimeError("no attempts made") + for attempt in range(_CP_TOKEN_FETCH_ATTEMPTS): + if attempt > 0: + backoff = 2 ** (attempt - 1) + logger.warning( + "[GRPC CLIENT] CP token fetch attempt %d/%d failed, retrying in %ds: %s", + attempt, + _CP_TOKEN_FETCH_ATTEMPTS, + backoff, + last_err, + ) + time.sleep(backoff) + try: + req = urllib.request.Request( + cp_token_url, + headers={"Authorization": f"Bearer {sa_token}"}, + ) + with urllib.request.urlopen(req, timeout=_CP_TOKEN_FETCH_TIMEOUT) as resp: + body = json.loads(resp.read()) + token = body.get("token", "") + if not token: + raise RuntimeError("CP /token response missing 'token' field") + logger.info("[GRPC CLIENT] Fetched fresh API token from CP token endpoint") + return token + except urllib.error.HTTPError as e: + resp_body = "" + try: + resp_body = e.read().decode(errors="replace") + except Exception: + pass + last_err = RuntimeError(f"CP /token HTTP {e.code}: {resp_body}") + except Exception as e: + last_err = e + + raise RuntimeError( + f"CP token endpoint unreachable after {_CP_TOKEN_FETCH_ATTEMPTS} attempts: {last_err}" + ) from last_err def _load_ca_cert(ca_cert_file: Optional[str]) -> Optional[bytes]: