diff --git a/components/ambient-control-plane/cmd/ambient-control-plane/main.go b/components/ambient-control-plane/cmd/ambient-control-plane/main.go index 69bed8d52..8614aa273 100644 --- a/components/ambient-control-plane/cmd/ambient-control-plane/main.go +++ b/components/ambient-control-plane/cmd/ambient-control-plane/main.go @@ -17,6 +17,7 @@ import ( "github.com/ambient-code/platform/components/ambient-control-plane/internal/informer" "github.com/ambient-code/platform/components/ambient-control-plane/internal/kubeclient" "github.com/ambient-code/platform/components/ambient-control-plane/internal/reconciler" + "github.com/ambient-code/platform/components/ambient-control-plane/internal/tokenserver" "github.com/ambient-code/platform/components/ambient-control-plane/internal/watcher" sdkclient "github.com/ambient-code/platform/components/ambient-sdk/go-sdk/client" "github.com/rs/zerolog" @@ -24,6 +25,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) var ( @@ -138,6 +141,7 @@ func runKubeMode(ctx context.Context, cfg *config.ControlPlaneConfig) error { MCPAPIServerURL: cfg.MCPAPIServerURL, RunnerLogLevel: cfg.RunnerLogLevel, CPRuntimeNamespace: cfg.CPRuntimeNamespace, + CPTokenURL: cfg.CPTokenURL, } conn, err := grpc.NewClient(cfg.GRPCServerAddr, grpc.WithTransportCredentials(grpcCredentials(cfg.GRPCUseTLS))) @@ -176,12 +180,46 @@ func runKubeMode(ctx context.Context, cfg *config.ControlPlaneConfig) error { sessionReconcilers := createSessionReconcilers(cfg.Reconcilers, factory, kube, projectKube, provisioner, kubeReconcilerCfg, log.Logger) for _, sessionRec := range sessionReconcilers { inf.RegisterHandler("sessions", sessionRec.Reconcile) - if kr, ok := sessionRec.(*reconciler.SimpleKubeReconciler); ok { - kr.StartTokenRefreshLoop(ctx) + } + + tsErrCh := make(chan error, 1) + go func() { + tsErrCh <- startTokenServer(ctx, cfg, tokenProvider) + }() + + infErrCh := make(chan error, 1) + go func() { + infErrCh <- inf.Run(ctx) + }() + + select { + case tsErr := <-tsErrCh: + if tsErr != nil { + return fmt.Errorf("token server: %w", tsErr) } + return <-infErrCh + case infErr := <-infErrCh: + return infErr } +} - return inf.Run(ctx) +func startTokenServer(ctx context.Context, cfg *config.ControlPlaneConfig, tokenProvider auth.TokenProvider) error { + k8sConfig, err := buildK8sRestConfig(cfg.Kubeconfig) + if err != nil { + return fmt.Errorf("building k8s rest config for token server: %w", err) + } + ts, err := tokenserver.New(cfg.CPTokenListenAddr, tokenProvider, k8sConfig, log.Logger) + if err != nil { + return fmt.Errorf("creating token server: %w", err) + } + return ts.Start(ctx) +} + +func buildK8sRestConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() } func createSessionReconcilers(reconcilerTypes []string, factory *reconciler.SDKClientFactory, kube *kubeclient.KubeClient, projectKube *kubeclient.KubeClient, provisioner kubeclient.NamespaceProvisioner, cfg reconciler.KubeReconcilerConfig, logger zerolog.Logger) []reconciler.Reconciler { diff --git a/components/ambient-control-plane/go.mod b/components/ambient-control-plane/go.mod index ec6bb3a80..d576c1c7e 100644 --- a/components/ambient-control-plane/go.mod +++ b/components/ambient-control-plane/go.mod @@ -10,25 +10,35 @@ require ( github.com/rs/zerolog v1.34.0 golang.org/x/oauth2 v0.34.0 google.golang.org/grpc v1.79.1 + k8s.io/api v0.34.0 k8s.io/apimachinery v0.34.0 k8s.io/client-go v0.34.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/go-restful/v3 v3.12.2 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/gnostic-models v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/x448/float16 v0.8.4 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/term v0.38.0 // indirect @@ -36,9 +46,11 @@ require ( golang.org/x/time v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/protobuf v1.36.11 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect diff --git a/components/ambient-control-plane/go.sum b/components/ambient-control-plane/go.sum index 1a2aef3da..ab57feee0 100644 --- a/components/ambient-control-plane/go.sum +++ b/components/ambient-control-plane/go.sum @@ -13,12 +13,16 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= +github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -29,6 +33,8 @@ github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7O github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -59,12 +65,14 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= +github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= +github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= @@ -73,9 +81,14 @@ github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6 github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -134,6 +147,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ= +golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -153,6 +168,7 @@ gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSP gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE= diff --git a/components/ambient-control-plane/internal/config/config.go b/components/ambient-control-plane/internal/config/config.go index cfd0852d1..08997d882 100644 --- a/components/ambient-control-plane/internal/config/config.go +++ b/components/ambient-control-plane/internal/config/config.go @@ -37,6 +37,8 @@ type ControlPlaneConfig struct { MCPAPIServerURL string RunnerLogLevel string ProjectKubeTokenFile string + CPTokenListenAddr string + CPTokenURL string } func Load() (*ControlPlaneConfig, error) { @@ -71,6 +73,8 @@ func Load() (*ControlPlaneConfig, error) { MCPAPIServerURL: envOrDefault("MCP_API_SERVER_URL", "http://ambient-api-server.ambient-code.svc:8000"), RunnerLogLevel: envOrDefault("RUNNER_LOG_LEVEL", "info"), ProjectKubeTokenFile: os.Getenv("PROJECT_KUBE_TOKEN_FILE"), + CPTokenListenAddr: envOrDefault("CP_TOKEN_LISTEN_ADDR", ":8080"), + CPTokenURL: os.Getenv("CP_TOKEN_URL"), } if cfg.APIToken == "" && (cfg.OIDCClientID == "" || cfg.OIDCClientSecret == "") { diff --git a/components/ambient-control-plane/internal/kubeclient/kubeclient.go b/components/ambient-control-plane/internal/kubeclient/kubeclient.go index 5681c258a..9f96289bc 100644 --- a/components/ambient-control-plane/internal/kubeclient/kubeclient.go +++ b/components/ambient-control-plane/internal/kubeclient/kubeclient.go @@ -57,6 +57,12 @@ var RoleGVR = schema.GroupVersionResource{ Resource: "roles", } +var NetworkPolicyGVR = schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "networkpolicies", +} + type KubeClient struct { dynamic dynamic.Interface logger zerolog.Logger @@ -258,6 +264,14 @@ func (kc *KubeClient) DeleteRoleBindingsByLabel(ctx context.Context, namespace, return kc.dynamic.Resource(RoleBindingGVR).Namespace(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: labelSelector}) } +func (kc *KubeClient) GetNetworkPolicy(ctx context.Context, namespace, name string) (*unstructured.Unstructured, error) { + return kc.dynamic.Resource(NetworkPolicyGVR).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) +} + +func (kc *KubeClient) CreateNetworkPolicy(ctx context.Context, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + return kc.dynamic.Resource(NetworkPolicyGVR).Namespace(obj.GetNamespace()).Create(ctx, obj, metav1.CreateOptions{}) +} + func (kc *KubeClient) GetResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (*unstructured.Unstructured, error) { return kc.dynamic.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) } diff --git a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go index 3a348ebc8..e2053b3cd 100644 --- a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go +++ b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go @@ -19,12 +19,6 @@ import ( const ( mcpSidecarPort = int64(8090) mcpSidecarURL = "http://localhost:8090" - - runnerTokenMountPath = "/var/run/secrets/ambient" - runnerTokenFileName = "bot-token" - runnerTokenVolumeKey = "api-token" - runnerTokenVolumeName = "runner-token" - runnerTokenRefreshEvery = 4 * time.Minute ) type KubeReconcilerConfig struct { @@ -44,6 +38,7 @@ type KubeReconcilerConfig struct { MCPAPIServerURL string RunnerLogLevel string CPRuntimeNamespace string + CPTokenURL string } type SimpleKubeReconciler struct { @@ -142,10 +137,6 @@ func (r *SimpleKubeReconciler) provisionSession(ctx context.Context, session typ sessionLabel := sessionLabelSelector(session.ID) - if err := r.ensureSecret(ctx, namespace, session, sessionLabel); err != nil { - return fmt.Errorf("ensuring secret: %w", err) - } - if r.cfg.VertexEnabled { if err := r.ensureVertexSecret(ctx, namespace); err != nil { return fmt.Errorf("ensuring vertex secret: %w", err) @@ -275,6 +266,65 @@ func (r *SimpleKubeReconciler) ensureNamespaceExists(ctx context.Context, namesp } } + if r.cfg.CPRuntimeNamespace != "" { + if err := r.ensureAPIServerNetworkPolicy(ctx, namespace); err != nil { + r.logger.Warn().Err(err).Str("namespace", namespace).Msg("failed to ensure api-server network policy") + } + } + + return nil +} + +func (r *SimpleKubeReconciler) ensureAPIServerNetworkPolicy(ctx context.Context, namespace string) error { + name := "allow-ambient-api-server" + + if _, err := r.nsKube().GetNetworkPolicy(ctx, namespace, name); err == nil { + return nil + } + + np := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "networking.k8s.io/v1", + "kind": "NetworkPolicy", + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + "labels": map[string]interface{}{ + LabelManaged: "true", + LabelManagedBy: "ambient-control-plane", + }, + }, + "spec": map[string]interface{}{ + "podSelector": map[string]interface{}{}, + "ingress": []interface{}{ + map[string]interface{}{ + "from": []interface{}{ + map[string]interface{}{ + "namespaceSelector": map[string]interface{}{ + "matchLabels": map[string]interface{}{ + "kubernetes.io/metadata.name": r.cfg.CPRuntimeNamespace, + }, + }, + }, + }, + "ports": []interface{}{ + map[string]interface{}{ + "protocol": "TCP", + "port": int64(8001), + }, + }, + }, + }, + "policyTypes": []interface{}{"Ingress"}, + }, + }, + } + + if _, err := r.nsKube().CreateNetworkPolicy(ctx, np); err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("creating network policy %s in %s: %w", name, namespace, err) + } + + r.logger.Debug().Str("namespace", namespace).Str("policy", name).Msg("api-server network policy created") return nil } @@ -309,41 +359,6 @@ func (r *SimpleKubeReconciler) ensureImagePullAccess(ctx context.Context, namesp return nil } -func (r *SimpleKubeReconciler) ensureSecret(ctx context.Context, namespace string, session types.Session, labelSelector string) error { - name := secretName(session.ID) - - if _, err := r.nsKube().GetSecret(ctx, namespace, name); err == nil { - return nil - } - - token, err := r.factory.Token(ctx) - if err != nil { - return fmt.Errorf("resolving API token: %w", err) - } - - secret := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Secret", - "metadata": map[string]interface{}{ - "name": name, - "namespace": namespace, - "labels": sessionLabels(session.ID, session.ProjectID), - }, - "stringData": map[string]interface{}{ - "api-token": token, - }, - }, - } - - if _, err := r.nsKube().CreateSecret(ctx, secret); err != nil && !k8serrors.IsAlreadyExists(err) { - return fmt.Errorf("creating secret %s: %w", name, err) - } - - r.logger.Debug().Str("secret", name).Str("namespace", namespace).Msg("secret created") - return nil -} - func (r *SimpleKubeReconciler) ensureServiceAccount(ctx context.Context, namespace string, session types.Session, labelSelector string) error { name := serviceAccountName(session.ID) @@ -360,7 +375,7 @@ func (r *SimpleKubeReconciler) ensureServiceAccount(ctx context.Context, namespa "namespace": namespace, "labels": sessionLabels(session.ID, session.ProjectID), }, - "automountServiceAccountToken": false, + "automountServiceAccountToken": true, }, } @@ -381,7 +396,6 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } saName := serviceAccountName(session.ID) - secretName := secretName(session.ID) runnerImage := r.cfg.RunnerImage imagePullPolicy := "Always" @@ -404,8 +418,8 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, "protocol": "TCP", }, }, - "volumeMounts": r.buildVolumeMounts(secretName), - "env": r.buildEnv(ctx, session, sdk, secretName, useMCPSidecar, credentialIDs), + "volumeMounts": r.buildVolumeMounts(), + "env": r.buildEnv(ctx, session, sdk, useMCPSidecar, credentialIDs), "resources": map[string]interface{}{ "requests": map[string]interface{}{ "cpu": "500m", @@ -426,7 +440,7 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } if useMCPSidecar { - containers = append(containers, r.buildMCPSidecar(secretName)) + containers = append(containers, r.buildMCPSidecar()) r.logger.Info().Str("session_id", session.ID).Msg("MCP sidecar enabled for session") } @@ -445,10 +459,10 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, }, "spec": map[string]interface{}{ "serviceAccountName": saName, - "automountServiceAccountToken": false, + "automountServiceAccountToken": true, "restartPolicy": "Never", "terminationGracePeriodSeconds": int64(60), - "volumes": r.buildVolumes(secretName), + "volumes": r.buildVolumes(), "containers": containers, }, }, @@ -462,7 +476,7 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, return nil } -func (r *SimpleKubeReconciler) buildVolumes(credSecretName string) []interface{} { +func (r *SimpleKubeReconciler) buildVolumes() []interface{} { vols := []interface{}{ map[string]interface{}{ "name": "workspace", @@ -475,18 +489,6 @@ func (r *SimpleKubeReconciler) buildVolumes(credSecretName string) []interface{} "optional": true, }, }, - map[string]interface{}{ - "name": runnerTokenVolumeName, - "secret": map[string]interface{}{ - "secretName": credSecretName, - "items": []interface{}{ - map[string]interface{}{ - "key": runnerTokenVolumeKey, - "path": runnerTokenFileName, - }, - }, - }, - }, } if r.cfg.VertexEnabled { vols = append(vols, map[string]interface{}{ @@ -499,7 +501,7 @@ func (r *SimpleKubeReconciler) buildVolumes(credSecretName string) []interface{} return vols } -func (r *SimpleKubeReconciler) buildVolumeMounts(_ string) []interface{} { +func (r *SimpleKubeReconciler) buildVolumeMounts() []interface{} { mounts := []interface{}{ map[string]interface{}{ "name": "workspace", @@ -511,11 +513,6 @@ func (r *SimpleKubeReconciler) buildVolumeMounts(_ string) []interface{} { "subPath": "service-ca.crt", "readOnly": true, }, - map[string]interface{}{ - "name": runnerTokenVolumeName, - "mountPath": runnerTokenMountPath, - "readOnly": true, - }, } if r.cfg.VertexEnabled { mounts = append(mounts, map[string]interface{}{ @@ -564,7 +561,7 @@ func (r *SimpleKubeReconciler) ensureVertexSecret(ctx context.Context, namespace return nil } -func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Session, sdk *sdkclient.Client, credSecretName string, useMCPSidecar bool, credentialIDs map[string]string) []interface{} { +func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Session, sdk *sdkclient.Client, useMCPSidecar bool, credentialIDs map[string]string) []interface{} { useVertex := "0" if r.cfg.VertexEnabled { useVertex = "1" @@ -584,7 +581,7 @@ func (r *SimpleKubeReconciler) buildEnv(ctx context.Context, session types.Sessi envVar("BACKEND_API_URL", r.cfg.BackendURL), envVar("USE_VERTEX", useVertex), envVar("CLAUDE_CODE_USE_VERTEX", useVertex), - envVarFromSecret("BOT_TOKEN", credSecretName, "api-token"), + envVar("AMBIENT_CP_TOKEN_URL", r.cfg.CPTokenURL), envVar("AMBIENT_GRPC_URL", r.cfg.RunnerGRPCURL), envVar("AMBIENT_GRPC_ENABLED", boolToStr(r.cfg.RunnerGRPCURL != "")), envVar("AMBIENT_GRPC_USE_TLS", boolToStr(r.cfg.RunnerGRPCUseTLS)), @@ -797,10 +794,6 @@ func podName(sessionID string) string { return fmt.Sprintf("session-%s-runner", safeResourceName(sessionID)) } -func secretName(sessionID string) string { - return fmt.Sprintf("session-%s-creds", safeResourceName(sessionID)) -} - func serviceAccountName(sessionID string) string { return fmt.Sprintf("session-%s-sa", safeResourceName(sessionID)) } @@ -816,7 +809,7 @@ func boolToStr(b bool) string { return "false" } -func (r *SimpleKubeReconciler) buildMCPSidecar(credSecretName string) interface{} { +func (r *SimpleKubeReconciler) buildMCPSidecar() interface{} { mcpImage := r.cfg.MCPImage imagePullPolicy := "Always" if strings.HasPrefix(mcpImage, "localhost/") { @@ -837,7 +830,7 @@ func (r *SimpleKubeReconciler) buildMCPSidecar(credSecretName string) interface{ envVar("MCP_TRANSPORT", "sse"), envVar("MCP_BIND_ADDR", fmt.Sprintf(":%d", mcpSidecarPort)), envVar("AMBIENT_API_URL", r.cfg.MCPAPIServerURL), - envVarFromSecret("AMBIENT_TOKEN", credSecretName, "api-token"), + envVar("AMBIENT_CP_TOKEN_URL", r.cfg.CPTokenURL), }, "resources": map[string]interface{}{ "requests": map[string]interface{}{ @@ -858,18 +851,6 @@ func (r *SimpleKubeReconciler) buildMCPSidecar(credSecretName string) interface{ } } -func envVarFromSecret(name, secretName, key string) interface{} { - return map[string]interface{}{ - "name": name, - "valueFrom": map[string]interface{}{ - "secretKeyRef": map[string]interface{}{ - "name": secretName, - "key": key, - }, - }, - } -} - func min(a, b int) int { if a < b { return a @@ -877,89 +858,3 @@ func min(a, b int) int { return b } -func (r *SimpleKubeReconciler) refreshRunnerToken(ctx context.Context, namespace, sessionID string) error { - name := secretName(sessionID) - existing, err := r.nsKube().GetSecret(ctx, namespace, name) - if err != nil { - return fmt.Errorf("getting secret %s: %w", name, err) - } - - token, err := r.factory.Token(ctx) - if err != nil { - return fmt.Errorf("resolving fresh API token: %w", err) - } - - updated := existing.DeepCopy() - if err := unstructured.SetNestedField(updated.Object, map[string]interface{}{"api-token": token}, "stringData"); err != nil { - return fmt.Errorf("setting stringData on secret %s: %w", name, err) - } - - if _, err := r.nsKube().UpdateSecret(ctx, updated); err != nil { - return fmt.Errorf("updating secret %s: %w", name, err) - } - - r.logger.Info().Str("secret", name).Str("namespace", namespace).Msg("runner token refreshed") - return nil -} - -func (r *SimpleKubeReconciler) StartTokenRefreshLoop(ctx context.Context) { - go func() { - r.refreshAllRunningTokens(ctx) - ticker := time.NewTicker(runnerTokenRefreshEvery) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - r.refreshAllRunningTokens(ctx) - } - } - }() -} - -func (r *SimpleKubeReconciler) refreshAllRunningTokens(ctx context.Context) { - projectOpts := &types.ListOptions{Page: 1, Size: 100} - for { - projectSDK, err := r.factory.ForProject(ctx, "_") - if err != nil { - r.logger.Warn().Err(err).Msg("token refresh loop: failed to get SDK client") - return - } - projectList, err := projectSDK.Projects().List(ctx, projectOpts) - if err != nil { - r.logger.Warn().Err(err).Int("page", projectOpts.Page).Msg("token refresh loop: failed to list projects") - return - } - for _, project := range projectList.Items { - sdk, err := r.factory.ForProject(ctx, project.ID) - if err != nil { - r.logger.Warn().Err(err).Str("project_id", project.ID).Msg("token refresh loop: failed to get SDK client for project") - continue - } - sessionOpts := &types.ListOptions{Page: 1, Size: 100, Search: "phase = 'Running'"} - for { - list, err := sdk.Sessions().List(ctx, sessionOpts) - if err != nil { - r.logger.Warn().Err(err).Str("project_id", project.ID).Int("page", sessionOpts.Page).Msg("token refresh loop: failed to list running sessions") - break - } - for i := range list.Items { - session := list.Items[i] - namespace := r.namespaceForSession(session) - if err := r.refreshRunnerToken(ctx, namespace, session.ID); err != nil { - r.logger.Warn().Err(err).Str("session_id", session.ID).Str("namespace", namespace).Msg("token refresh loop: failed to refresh token") - } - } - if len(list.Items) == 0 || list.Total <= sessionOpts.Page*sessionOpts.Size { - break - } - sessionOpts.Page++ - } - } - if len(projectList.Items) == 0 || projectList.Total <= projectOpts.Page*projectOpts.Size { - break - } - projectOpts.Page++ - } -} diff --git a/components/ambient-control-plane/internal/tokenserver/handler.go b/components/ambient-control-plane/internal/tokenserver/handler.go new file mode 100644 index 000000000..fd0bc721a --- /dev/null +++ b/components/ambient-control-plane/internal/tokenserver/handler.go @@ -0,0 +1,123 @@ +package tokenserver + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/ambient-code/platform/components/ambient-control-plane/internal/auth" + "github.com/rs/zerolog" + authv1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +const ( + runnerSAPrefix = "system:serviceaccount:" + sessionSAInfix = ":session-" + sessionSASuffix = "-sa" + tokenReviewTimeout = 10 * time.Second +) + +type tokenResponse struct { + Token string `json:"token"` +} + +type handler struct { + tokenProvider auth.TokenProvider + k8sClient kubernetes.Interface + logger zerolog.Logger +} + +func (h *handler) handleToken(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + saToken, err := extractBearerToken(r) + if err != nil { + h.logger.Warn().Err(err).Msg("token request: missing or malformed Authorization header") + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + username, err := h.validateSAToken(r.Context(), saToken) + if err != nil { + h.logger.Warn().Err(err).Msg("token request: SA token validation failed") + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + if !isRunnerSA(username) { + h.logger.Warn().Str("username", username).Msg("token request: username does not match runner SA pattern") + http.Error(w, "forbidden", http.StatusForbidden) + return + } + + apiToken, err := h.tokenProvider.Token(r.Context()) + if err != nil { + h.logger.Error().Err(err).Str("username", username).Msg("token request: failed to mint API token") + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + + h.logger.Info().Str("username", username).Msg("token request: issued fresh API token") + + resp := tokenResponse{Token: apiToken} + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + h.logger.Warn().Err(err).Msg("token request: failed to write response") + } +} + +func (h *handler) validateSAToken(ctx context.Context, token string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, tokenReviewTimeout) + defer cancel() + + tr := &authv1.TokenReview{ + Spec: authv1.TokenReviewSpec{ + Token: token, + }, + } + + result, err := h.k8sClient.AuthenticationV1().TokenReviews().Create(ctx, tr, metav1.CreateOptions{}) + if err != nil { + return "", fmt.Errorf("TokenReview API call failed: %w", err) + } + if !result.Status.Authenticated { + return "", fmt.Errorf("token not authenticated: %s", result.Status.Error) + } + + return result.Status.User.Username, nil +} + +func isRunnerSA(username string) bool { + if !strings.HasPrefix(username, runnerSAPrefix) { + return false + } + rest := strings.TrimPrefix(username, runnerSAPrefix) + idx := strings.Index(rest, sessionSAInfix) + if idx < 0 { + return false + } + return strings.HasSuffix(rest, sessionSASuffix) +} + +func extractBearerToken(r *http.Request) (string, error) { + auth := r.Header.Get("Authorization") + if auth == "" { + return "", fmt.Errorf("Authorization header missing") + } + if !strings.HasPrefix(auth, "Bearer ") { + return "", fmt.Errorf("Authorization header must use Bearer scheme") + } + token := strings.TrimPrefix(auth, "Bearer ") + if token == "" { + return "", fmt.Errorf("empty bearer token") + } + return token, nil +} diff --git a/components/ambient-control-plane/internal/tokenserver/server.go b/components/ambient-control-plane/internal/tokenserver/server.go new file mode 100644 index 000000000..25f5f3ce5 --- /dev/null +++ b/components/ambient-control-plane/internal/tokenserver/server.go @@ -0,0 +1,86 @@ +package tokenserver + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/ambient-code/platform/components/ambient-control-plane/internal/auth" + "github.com/rs/zerolog" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const ( + DefaultListenAddr = ":8080" + readTimeout = 10 * time.Second + writeTimeout = 10 * time.Second + idleTimeout = 60 * time.Second + shutdownGracePeriod = 5 * time.Second +) + +type Server struct { + srv *http.Server + logger zerolog.Logger +} + +func New( + listenAddr string, + tokenProvider auth.TokenProvider, + k8sConfig *rest.Config, + logger zerolog.Logger, +) (*Server, error) { + k8sClient, err := kubernetes.NewForConfig(k8sConfig) + if err != nil { + return nil, fmt.Errorf("creating k8s client for token server: %w", err) + } + + h := &handler{ + tokenProvider: tokenProvider, + k8sClient: k8sClient, + logger: logger.With().Str("component", "tokenserver").Logger(), + } + + mux := http.NewServeMux() + mux.HandleFunc("/token", h.handleToken) + mux.HandleFunc("/healthz", handleHealthz) + + return &Server{ + srv: &http.Server{ + Addr: listenAddr, + Handler: mux, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + IdleTimeout: idleTimeout, + }, + logger: logger.With().Str("component", "tokenserver").Logger(), + }, nil +} + +func (s *Server) Start(ctx context.Context) error { + errCh := make(chan error, 1) + go func() { + s.logger.Info().Str("addr", s.srv.Addr).Msg("token server listening") + if err := s.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errCh <- err + } + }() + + select { + case <-ctx.Done(): + shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownGracePeriod) + defer cancel() + if err := s.srv.Shutdown(shutdownCtx); err != nil { + s.logger.Warn().Err(err).Msg("token server shutdown error") + } + return nil + case err := <-errCh: + return fmt.Errorf("token server: %w", err) + } +} + +func handleHealthz(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) +} diff --git a/components/runners/ambient-runner/ambient_runner/_grpc_client.py b/components/runners/ambient-runner/ambient_runner/_grpc_client.py index 0a975ac58..a46662bf6 100644 --- a/components/runners/ambient-runner/ambient_runner/_grpc_client.py +++ b/components/runners/ambient-runner/ambient_runner/_grpc_client.py @@ -1,7 +1,12 @@ from __future__ import annotations +import json import logging import os +import time +import urllib.error +import urllib.parse +import urllib.request from pathlib import Path from typing import Optional @@ -11,23 +16,82 @@ _ENV_GRPC_URL = "AMBIENT_GRPC_URL" _ENV_TOKEN = "BOT_TOKEN" +_ENV_CP_TOKEN_URL = "AMBIENT_CP_TOKEN_URL" _ENV_USE_TLS = "AMBIENT_GRPC_USE_TLS" _ENV_CA_CERT = "AMBIENT_GRPC_CA_CERT_FILE" _DEFAULT_GRPC_URL = "ambient-api-server:9000" _SERVICE_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt" -_BOT_TOKEN_FILE = Path("/var/run/secrets/ambient/bot-token") +_SA_TOKEN_FILE = Path("/var/run/secrets/kubernetes.io/serviceaccount/token") -def _read_current_token() -> str: - """Read the bot token, preferring the kubelet-rotated file mount over env var.""" +_CP_TOKEN_FETCH_ATTEMPTS = 3 +_CP_TOKEN_FETCH_TIMEOUT = 10 + + +def _validate_cp_token_url(url: str) -> None: + """Reject non-http(s) or credential-bearing URLs to prevent SA token exfiltration.""" + parsed = urllib.parse.urlparse(url) + if ( + parsed.scheme not in {"http", "https"} + or not parsed.netloc + or parsed.username is not None + or parsed.password is not None + ): + raise RuntimeError( + f"invalid CP token URL (must be http/https with no credentials): {url!r}" + ) + + +def _fetch_token_from_cp(cp_token_url: str) -> str: + """Fetch a fresh API token from the CP /token endpoint using the pod SA token. + + Retries up to _CP_TOKEN_FETCH_ATTEMPTS times with exponential backoff + to handle transient CP unavailability. + """ + _validate_cp_token_url(cp_token_url) + try: - if _BOT_TOKEN_FILE.exists(): - token = _BOT_TOKEN_FILE.read_text().strip() - if token: - return token - except OSError: - pass - return os.environ.get(_ENV_TOKEN, "") + sa_token = _SA_TOKEN_FILE.read_text().strip() + except OSError as e: + raise RuntimeError(f"cannot read SA token from {_SA_TOKEN_FILE}: {e}") from e + + last_err: Exception = RuntimeError("no attempts made") + for attempt in range(_CP_TOKEN_FETCH_ATTEMPTS): + if attempt > 0: + backoff = 2 ** (attempt - 1) + logger.warning( + "[GRPC CLIENT] CP token fetch attempt %d/%d failed, retrying in %ds: %s", + attempt, + _CP_TOKEN_FETCH_ATTEMPTS, + backoff, + last_err, + ) + time.sleep(backoff) + try: + req = urllib.request.Request( + cp_token_url, + headers={"Authorization": f"Bearer {sa_token}"}, + ) + with urllib.request.urlopen(req, timeout=_CP_TOKEN_FETCH_TIMEOUT) as resp: + body = json.loads(resp.read()) + token = body.get("token", "") + if not token: + raise RuntimeError("CP /token response missing 'token' field") + logger.info("[GRPC CLIENT] Fetched fresh API token from CP token endpoint") + return token + except urllib.error.HTTPError as e: + resp_body = "" + try: + resp_body = e.read().decode(errors="replace") + except Exception: + pass + last_err = RuntimeError(f"CP /token HTTP {e.code}: {resp_body}") + except Exception as e: + last_err = e + + raise RuntimeError( + f"CP token endpoint unreachable after {_CP_TOKEN_FETCH_ATTEMPTS} attempts: {last_err}" + ) from last_err def _load_ca_cert(ca_cert_file: Optional[str]) -> Optional[bytes]: @@ -82,11 +146,13 @@ def __init__( token: str, use_tls: bool = False, ca_cert_file: Optional[str] = None, + cp_token_url: str = "", ) -> None: self._grpc_url = grpc_url self._token = token self._use_tls = use_tls self._ca_cert_file = ca_cert_file + self._cp_token_url = cp_token_url self._channel: Optional[grpc.Channel] = None self._session_messages: Optional["SessionMessagesAPI"] = None # noqa: F821 @@ -94,9 +160,17 @@ def __init__( def from_env(cls) -> AmbientGRPCClient: """Create client from environment variables.""" grpc_url = os.environ.get(_ENV_GRPC_URL, _DEFAULT_GRPC_URL) - token = _read_current_token() + cp_token_url = os.environ.get(_ENV_CP_TOKEN_URL, "") use_tls = os.environ.get(_ENV_USE_TLS, "").lower() in ("true", "1", "yes") ca_cert_file = os.environ.get(_ENV_CA_CERT) + if cp_token_url: + logger.info( + "[GRPC CLIENT] Fetching token from CP endpoint: url=%s", cp_token_url + ) + token = _fetch_token_from_cp(cp_token_url) + else: + token = os.environ.get(_ENV_TOKEN, "") + logger.info("[GRPC CLIENT] Using BOT_TOKEN env var (local dev mode)") logger.info( "[GRPC CLIENT] Initializing from env: url=%s tls=%s token_len=%d", grpc_url, @@ -104,12 +178,19 @@ def from_env(cls) -> AmbientGRPCClient: len(token), ) return cls( - grpc_url=grpc_url, token=token, use_tls=use_tls, ca_cert_file=ca_cert_file + grpc_url=grpc_url, + token=token, + use_tls=use_tls, + ca_cert_file=ca_cert_file, + cp_token_url=cp_token_url, ) def reconnect(self) -> None: - """Close the existing channel and rebuild with a fresh token from the file mount.""" - fresh_token = _read_current_token() + """Close the existing channel and rebuild with a fresh token from the CP endpoint.""" + if self._cp_token_url: + fresh_token = _fetch_token_from_cp(self._cp_token_url) + else: + fresh_token = os.environ.get(_ENV_TOKEN, "") logger.info( "[GRPC CLIENT] Reconnecting with fresh token (len=%d)", len(fresh_token) ) diff --git a/docs/internal/design/control-plane.spec.md b/docs/internal/design/control-plane.spec.md index c9ad76730..412ecf1b1 100644 --- a/docs/internal/design/control-plane.spec.md +++ b/docs/internal/design/control-plane.spec.md @@ -83,7 +83,7 @@ The CP creates a Pod (not a Job) for each session. Key pod attributes: | `restartPolicy` | `Never` | Sessions are single-run; no automatic restart | | `imagePullPolicy` | `IfNotPresent` for `localhost/` images, `Always` otherwise | kind uses local containerd — `Always` breaks `localhost/` image pulls | | `serviceAccountName` | `session-{id}-sa` | Session-scoped; no cross-session access | -| `automountServiceAccountToken` | `false` | Runner uses BOT_TOKEN, not SA token | +| `automountServiceAccountToken` | `true` | Runner uses the SA token to authenticate to the CP token endpoint | | CPU request/limit | 500m / 2000m | Generous for Claude Code | | Memory request/limit | 512Mi / 4Gi | Claude Code is memory-intensive | @@ -109,9 +109,9 @@ Each section is joined with `\n\n`. Empty sections are omitted. If all four are | `WORKSPACE_PATH` | `/workspace` | Claude Code working directory | | `AGUI_PORT` | `8001` | Runner HTTP listener port | | `BACKEND_API_URL` | CP config | api-server base URL | -| `BOT_TOKEN` | from K8s secret | api-server bearer token | | `AMBIENT_GRPC_URL` | CP config | api-server gRPC address | | `AMBIENT_GRPC_USE_TLS` | CP config | TLS flag for gRPC | +| `AMBIENT_CP_TOKEN_URL` | CP config | CP token endpoint URL (e.g. `http://ambient-control-plane.{ns}.svc:8080/token`) | | `INITIAL_PROMPT` | assembled prompt | Auto-execute on startup | | `USE_VERTEX` / `ANTHROPIC_VERTEX_PROJECT_ID` / `CLOUD_ML_REGION` | CP config | Vertex AI config (when enabled) | | `GOOGLE_APPLICATION_CREDENTIALS` | `/app/vertex/ambient-code-key.json` | Vertex service account path | @@ -357,6 +357,98 @@ Status: 🔲 planned --- +## CP Token Endpoint + +### Problem + +Runner pods authenticate to the api-server gRPC interface using a `BOT_TOKEN` injected at pod start and refreshed by the CP every 4 minutes via a K8s Secret update. In OIDC environments (e.g. S0), `BOT_TOKEN` is an OIDC client-credentials JWT with a 15-minute TTL. + +This creates a three-way async race: + +1. CP ticker writes a fresh token to the Secret every 4 minutes +2. Kubelet propagates the Secret update to the pod's file mount (30–60s delay in busy clusters) +3. Runner reads the file mount on gRPC reconnect + +When the CP writes a token that is already close to expiry — because its in-memory `OIDCTokenProvider` cache had a short buffer — the runner reconnects with an already-expired token and enters an `UNAUTHENTICATED` loop. + +The fundamental issue is that the Secret-write model is an **async push** with no synchronization guarantee between when the token is written and when the runner reads it. + +### Solution + +The CP exposes a lightweight HTTP endpoint that runners call **synchronously on demand** to obtain a guaranteed-fresh token. This eliminates the async race entirely. + +``` +GET /token +``` + +- Served by a new `net/http` listener on the CP (port 8080, separate from any existing listener) +- Runner authenticates using its K8s service account token (mounted at `/var/run/secrets/kubernetes.io/serviceaccount/token`) — validated by the CP via the K8s `TokenReview` API +- CP calls `tokenProvider.Token(ctx)` at request time and returns the result — always fresh, always valid TTL +- Response: `{"token": "", "expires_at": ""}` + +### Authentication + +The runner's K8s SA token is a signed JWT issued by the K8s API server. The CP validates it using the K8s `authentication/v1` `TokenReview` resource: + +``` +POST /apis/authentication.k8s.io/v1/tokenreviews +{ + "spec": { "token": "" } +} +``` + +A successful `TokenReview` returns `status.authenticated=true` and `status.user.username` (e.g. `system:serviceaccount:ambient-code--myproject:session-abc123-sa`). The CP verifies the username prefix matches a known runner SA pattern before returning a token. + +This approach uses credentials already present in every pod — no new secrets required. + +### Token Lifecycle + +The CP token endpoint is the **sole source** of the api-server bearer token for all runner pods. There is no Secret write loop and no `BOT_TOKEN` env var or file mount. + +| Phase | Mechanism | +|---|---| +| Initial startup | `GET /token` from CP endpoint — called in lifespan before gRPC channel opens | +| gRPC reconnect | `GET /token` from CP endpoint — synchronous, guaranteed fresh | + +The CP is critical infrastructure. It creates the runner pod, so it is running before the runner makes its first token request. If the CP is unreachable, the runner cannot function regardless (the CP is also responsible for all K8s provisioning). No fallback is needed or provided. + +### CP HTTP Server + +The CP adds a minimal `net/http` server alongside its existing K8s controller loop: + +```go +mux := http.NewServeMux() +mux.HandleFunc("/token", tokenHandler) +mux.HandleFunc("/healthz", healthHandler) +http.ListenAndServe(":8080", mux) +``` + +The server runs in a goroutine alongside `runKubeMode`. It shares the existing `tokenProvider` and `k8sClient` from the main CP config. + +### Runner Changes + +`_grpc_client.py` `reconnect()` is updated to call the CP token endpoint instead of re-reading the Secret file: + +```python +def reconnect(self) -> None: + fresh_token = _fetch_token_from_cp() # GET AMBIENT_CP_TOKEN_URL/token with SA token + self.close() + self._token = fresh_token +``` + +`AMBIENT_CP_TOKEN_URL` is injected by the CP as an env var when creating the runner pod. In local dev environments where the CP is not present, `BOT_TOKEN` env var may be set directly and the runner skips the CP endpoint call. + +### New CP Internal Packages + +| Package | Purpose | +|---|---| +| `internal/tokenserver/server.go` | HTTP server setup and graceful shutdown | +| `internal/tokenserver/handler.go` | `GET /token` handler — TokenReview validation + tokenProvider call | + +Status: 🔲 planned — RHOAIENG-56711 + +--- + ## Runner Credential Fetch The runner fetches provider credentials at session start before invoking Claude. Credentials are resolved by the CP and injected into the runner pod as `CREDENTIAL_IDS` — a JSON-encoded map of `provider → credential_id`: @@ -427,3 +519,6 @@ The `ambient-control-plane` ServiceAccount does not have `delete` on `namespaces | assistant payload → plain string | Symmetric with user payload; reasoning is observability data not conversation record | | GET /events is runner-local | Runner has the event queue; api-server proxies it; no second fan-out layer needed | | Namespace per project, not per session | Sessions within a project share a namespace; secrets and RBAC are project-scoped | +| CP token endpoint over Secret-write renewal | Secret writes are async push with no synchronization guarantee vs. token TTL; synchronous pull from CP eliminates the race entirely | +| Runner SA token for CP auth | K8s SA tokens are already mounted in every pod, long-lived, and K8s-managed — no new secrets or out-of-band key distribution required | +| CP is sole token source — no BOT_TOKEN Secret | CP creates the runner pod, so it is always reachable before the runner's first token request; retaining a Secret adds complexity and a second failure mode with the same blast radius |