diff --git a/cmd/cluster-agent/subcommands/check/command.go b/cmd/cluster-agent/subcommands/check/command.go index 0beaed5a6e0a..2d1f9d0cf792 100644 --- a/cmd/cluster-agent/subcommands/check/command.go +++ b/cmd/cluster-agent/subcommands/check/command.go @@ -12,11 +12,13 @@ import ( "github.com/DataDog/datadog-agent/cmd/cluster-agent/command" wmcatalog "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/catalog-clusteragent" "github.com/DataDog/datadog-agent/pkg/cli/subcommands/check" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" pkgcommon "github.com/DataDog/datadog-agent/pkg/util/common" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection" "github.com/spf13/cobra" + "go.uber.org/fx" ) // Commands returns a slice of subcommands for the 'cluster-agent' command. @@ -33,7 +35,12 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { ConfigName: command.ConfigName, LoggerName: command.LoggerName, } - }, wmcatalog.GetCatalog()) + }, fx.Options( + wmcatalog.GetCatalog(), + // Required by the kubeapiserver collector, but never enabled in the + // "check" command because autoscaling doesn't run. + fx.Supply(autoscalinggate.New()), + )) return []*cobra.Command{cmd} } diff --git a/cmd/cluster-agent/subcommands/start/command.go b/cmd/cluster-agent/subcommands/start/command.go index 43e5bc727531..ca6e0adf6ddc 100644 --- a/cmd/cluster-agent/subcommands/start/command.go +++ b/cmd/cluster-agent/subcommands/start/command.go @@ -87,11 +87,13 @@ import ( metricscompressionfx "github.com/DataDog/datadog-agent/comp/serializer/metricscompression/fx" "github.com/DataDog/datadog-agent/pkg/clusteragent" admissionpkg "github.com/DataDog/datadog-agent/pkg/clusteragent/admission" + "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/controllers/webhook" + admissionautoscaling "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/autoscaling" admissionpatch "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/patch" apidca "github.com/DataDog/datadog-agent/pkg/clusteragent/api" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate" "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster" clusterspot "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster/spot" - "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload" "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/provider" pkgclusterchecks "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks" instrumentationhandlers "github.com/DataDog/datadog-agent/pkg/clusteragent/instrumentation/handlers" @@ -161,6 +163,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { // `fxutil.OneShot`. return fxutil.OneShot(start, fx.Supply(globalParams), + fx.Supply(autoscalinggate.New()), fx.Supply(core.BundleParams{ ConfigParams: config.NewClusterAgentParams(globalParams.ConfFilePath, config.WithExtraConfFiles(globalParams.ExtraConfFilePath)), LogParams: log.ForDaemon(command.LoggerName, "log_file", defaultpaths.DCALogFile), @@ -289,6 +292,7 @@ func start(log log.Component, tracerouteComp traceroute.Component, eventPlatform eventplatform.Component, healthPlatform option.Option[healthplatformdef.Component], + autoscalingGate *autoscalinggate.Gate, ) error { stopCh := make(chan struct{}) validatingStopCh := make(chan struct{}) @@ -548,7 +552,6 @@ func start(log log.Component, } // Autoscaling Product - var pp workload.PodPatcher if config.GetBool("autoscaling.workload.enabled") { if rcClient == nil { return errors.New("Remote config is disabled or failed to initialize, remote config is a required dependency for autoscaling") @@ -558,11 +561,14 @@ func start(log log.Component, log.Error("Admission controller is disabled, vertical autoscaling requires the admission controller to be enabled. Vertical scaling will be disabled.") } - if patcher, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, taggerComp, demultiplexer); err == nil { - pp = patcher - } else { - return fmt.Errorf("Error while starting workload autoscaling: %v", err) + // Register the gate handlers that allow the lazy startup of the + // autoscaling stack. It stays off until a DatadogPodAutoscaler or a + // workload/namespace with the autoscaling profile label is observed. + workloadResources := provider.SupportedWorkloadResources(apiCl.Cl.Discovery()) + if err := provider.RegisterAutoscalingGateHandlers(mainCtx, apiCl.DynamicInformerFactory, apiCl.MetadataInformerCl, workloadResources, autoscalingGate); err != nil { + return fmt.Errorf("Error while registering autoscaling gate handlers: %w", err) } + apiCl.DynamicInformerFactory.Start(mainCtx.Done()) } if config.GetBool("autoscaling.cluster.enabled") { @@ -634,7 +640,10 @@ func start(log log.Component, } } - if config.GetBool("admission_controller.enabled") { + var autoscalingWebhook *admissionautoscaling.Webhook + admissionEnabled := config.GetBool("admission_controller.enabled") + + if admissionEnabled { if config.GetBool("admission_controller.auto_instrumentation.patcher.enabled") { patchCtx := admissionpatch.ControllerContext{ LeadershipStateSubscribeFunc: le.Subscribe, @@ -666,7 +675,7 @@ func start(log log.Component, InstrumentationHandlers: instrHandlers, } - webhooks, err := admissionpkg.StartControllers(admissionCtx, datadogConfig, wmeta, pp, sh, healthPlatform) + webhooks, err := admissionpkg.StartControllers(admissionCtx, datadogConfig, wmeta, sh, healthPlatform) // Ignore the error if it's related to the validatingwebhookconfigurations. var syncInformerError *apiserver.SyncInformersError if err != nil && !(errors.As(err, &syncInformerError) && syncInformerError.Name == apiserver.ValidatingWebhooksInformer) { @@ -696,11 +705,22 @@ func start(log log.Component, pkglog.Errorf("Error in the Admission Controller Webhook Server: %v", errServ) } }() + + autoscalingWebhook = findAutoscalingWebhook(webhooks) } } else { pkglog.Info("Admission controller is disabled") } + // Some autoscaling features don't require the admission webhook, so start + // the autoscaling stack even without it. + if config.GetBool("autoscaling.workload.enabled") { + if admissionEnabled && autoscalingWebhook == nil { + pkglog.Errorf("Autoscaling webhook not available. Vertical autoscaling will not apply recommendations to new pods") + } + go provider.StartWorkloadAutoscalingOnGate(mainCtx, autoscalingGate, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, taggerComp, demultiplexer, autoscalingWebhook) + } + if config.GetBool("cluster_agent.mcp.enabled") { // Get MCP configured endpoint mcpEndpoint := config.GetString("cluster_agent.mcp.endpoint") @@ -753,6 +773,15 @@ func start(log log.Component, return nil } +func findAutoscalingWebhook(webhooks []webhook.Webhook) *admissionautoscaling.Webhook { + for _, hook := range webhooks { + if autoscalingWebhook, ok := hook.(*admissionautoscaling.Webhook); ok { + return autoscalingWebhook + } + } + return nil +} + func setupClusterCheck(ctx context.Context, ac autodiscovery.Component, tagger tagger.Component) (*pkgclusterchecks.Handler, error) { handler, err := pkgclusterchecks.NewHandler(ac, tagger) if err != nil { diff --git a/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver.go b/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver.go index b116588ab747..bf0c147f9f62 100644 --- a/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver.go +++ b/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver.go @@ -13,7 +13,6 @@ import ( "strings" "time" - "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common" "go.uber.org/fx" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" @@ -22,9 +21,11 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate" configutils "github.com/DataDog/datadog-agent/pkg/config/utils" "github.com/DataDog/datadog-agent/pkg/status/health" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver" + "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -37,18 +38,26 @@ const ( type dependencies struct { fx.In - Config config.Component + Config config.Component + AutoscalingGate *autoscalinggate.Gate } // storeGenerator returns a new store specific to a given resource type storeGenerator func(context.Context, workloadmeta.Component, config.Reader, kubernetes.Interface) (*cache.Reflector, *reflectorStore) func shouldHavePodStore(cfg config.Reader) bool { - metadataAsTags := configutils.GetMetadataAsTags(cfg) - hasPodLabelsAsTags := len(metadataAsTags.GetPodLabelsAsTags()) > 0 - hasPodAnnotationsAsTags := len(metadataAsTags.GetPodAnnotationsAsTags()) > 0 + return podsRequiredAtStartup(cfg) || cfg.GetBool("autoscaling.workload.enabled") +} - return cfg.GetBool("cluster_agent.collect_kubernetes_tags") || cfg.GetBool("autoscaling.workload.enabled") || cfg.GetBool("autoscaling.cluster.spot.enabled") || hasPodLabelsAsTags || hasPodAnnotationsAsTags +// podsRequiredAtStartup returns true when some feature needs the pod store to +// be available from startup. Workload autoscaling does not. It can defer the +// start via the autoscaling gate. +func podsRequiredAtStartup(cfg config.Reader) bool { + metadataAsTags := configutils.GetMetadataAsTags(cfg) + return cfg.GetBool("cluster_agent.collect_kubernetes_tags") || + cfg.GetBool("autoscaling.cluster.spot.enabled") || + len(metadataAsTags.GetPodLabelsAsTags()) > 0 || + len(metadataAsTags.GetPodAnnotationsAsTags()) > 0 } func shouldHaveDeploymentStore(cfg config.Reader) bool { @@ -59,20 +68,6 @@ func shouldHaveDeploymentStore(cfg config.Reader) bool { return cfg.GetBool("language_detection.enabled") && cfg.GetBool("language_detection.reporting.enabled") || hasDeploymentsLabelsAsTags || hasDeploymentsAnnotationsAsTags } -func storeGenerators(cfg config.Reader) []storeGenerator { - var generators []storeGenerator - - if shouldHavePodStore(cfg) { - generators = append(generators, newPodStore) - } - - if shouldHaveDeploymentStore(cfg) { - generators = append(generators, newDeploymentStore) - } - - return generators -} - func metadataCollectionGVRs(cfg config.Reader, discoveryClient discovery.DiscoveryInterface) ([]schema.GroupVersionResource, error) { return getGVRsForRequestedResources(discoveryClient, resourcesWithMetadataCollectionEnabled(cfg)) } @@ -170,18 +165,20 @@ func resourcesForAPMConfig(cfg config.Reader) []string { } type collector struct { - id string - catalog workloadmeta.AgentType - config config.Reader + id string + catalog workloadmeta.AgentType + config config.Reader + autoscalingGate *autoscalinggate.Gate } // NewCollector returns a kubeapiserver CollectorProvider that instantiates its colletor func NewCollector(deps dependencies) (workloadmeta.CollectorProvider, error) { return workloadmeta.CollectorProvider{ Collector: &collector{ - id: collectorID, - catalog: workloadmeta.ClusterAgent, - config: deps.Config, + id: collectorID, + catalog: workloadmeta.ClusterAgent, + config: deps.Config, + autoscalingGate: deps.AutoscalingGate, }, }, nil } @@ -218,8 +215,26 @@ func (c *collector) Start(ctx context.Context, wlmetaStore workloadmeta.Componen } } - for _, storeBuilder := range storeGenerators(c.config) { - reflector, store := storeBuilder(ctx, wlmetaStore, c.config, client) + if shouldHavePodStore(c.config) { + autoscalingEnabled := c.config.GetBool("autoscaling.workload.enabled") + lazyStart := !podsRequiredAtStartup(c.config) && autoscalingEnabled + + if lazyStart { + // The store is intentionally not added to objectStores. It would + // block the startup readiness check. + go c.startPodStoreOnGate(ctx, wlmetaStore, client, newPodStore) + } else { + reflector, store := newPodStore(ctx, wlmetaStore, c.config, client) + objectStores = append(objectStores, store) + go reflector.Run(ctx.Done()) + if autoscalingEnabled { + go c.markPodCollectionSyncedWhenReady(ctx, store) + } + } + } + + if shouldHaveDeploymentStore(c.config) { + reflector, store := newDeploymentStore(ctx, wlmetaStore, c.config, client) objectStores = append(objectStores, store) go reflector.Run(ctx.Done()) } @@ -241,6 +256,30 @@ func (c *collector) Start(ctx context.Context, wlmetaStore workloadmeta.Componen return nil } +// startPodStoreOnGate blocks until the autoscaling gate is enabled or the +// context is cancelled. On gate enable, it starts the pod reflector. +func (c *collector) startPodStoreOnGate(ctx context.Context, wlmetaStore workloadmeta.Component, client kubernetes.Interface, newStore storeGenerator) { + if !c.autoscalingGate.WaitForEnable(ctx) { + return + } + + log.Debug("Autoscaling gate enabled, starting workloadmeta pod reflector lazily") + reflector, store := newStore(ctx, wlmetaStore, c.config, client) + go reflector.Run(ctx.Done()) + + c.markPodCollectionSyncedWhenReady(ctx, store) +} + +// markPodCollectionSyncedWhenReady waits for the pod store's cache to sync +// then signals the autoscaling gate. The workload autoscaling stack waits on +// this signal before starting. +func (c *collector) markPodCollectionSyncedWhenReady(ctx context.Context, store *reflectorStore) { + if !cache.WaitForCacheSync(ctx.Done(), store.HasSynced) { + return + } + c.autoscalingGate.MarkPodCollectionSynced() +} + func (c *collector) Pull(_ context.Context) error { return nil } diff --git a/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver_test.go b/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver_test.go index ca8242efafc0..9ebbf950728a 100644 --- a/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver_test.go +++ b/comp/core/workloadmeta/collectors/internal/kubeapiserver/kubeapiserver_test.go @@ -8,103 +8,180 @@ package kubeapiserver import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/fx" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" fakediscovery "k8s.io/client-go/discovery/fake" fakeclientset "k8s.io/client-go/kubernetes/fake" + "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/comp/core/config" + workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" + workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock" + workloadmetamock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/mock" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) -func TestStoreGenerators(t *testing.T) { - // Define tests +func TestShouldHaveDeploymentStore(t *testing.T) { tests := []struct { - name string - cfg map[string]interface{} - expectedStoresGenerator []storeGenerator + name string + cfg map[string]interface{} + expected bool }{ { - name: "All configurations disabled", + name: "language detection disabled", cfg: map[string]interface{}{ - "cluster_agent.collect_kubernetes_tags": false, - "language_detection.reporting.enabled": false, - "language_detection.enabled": false, + "language_detection.enabled": false, }, - expectedStoresGenerator: []storeGenerator{}, + expected: false, }, { - name: "All configurations disabled", + name: "language detection enabled but reporting disabled", cfg: map[string]interface{}{ - "cluster_agent.collect_kubernetes_tags": false, - "language_detection.reporting.enabled": false, - "language_detection.enabled": true, + "language_detection.enabled": true, + "language_detection.reporting.enabled": false, }, - expectedStoresGenerator: []storeGenerator{}, + expected: false, }, { - name: "Kubernetes tags enabled", + name: "language detection and reporting enabled", + cfg: map[string]interface{}{ + "language_detection.reporting.enabled": true, + "language_detection.enabled": true, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := config.NewMockWithOverrides(t, tt.cfg) + assert.Equal(t, tt.expected, shouldHaveDeploymentStore(cfg)) + }) + } +} + +func TestShouldHavePodStore(t *testing.T) { + tests := []struct { + name string + cfg map[string]interface{} + expected bool + }{ + { + name: "no triggers", + cfg: map[string]interface{}{}, + expected: false, + }, + { + name: "kubernetes tags collection", cfg: map[string]interface{}{ "cluster_agent.collect_kubernetes_tags": true, - "language_detection.reporting.enabled": false, - "language_detection.enabled": true, }, - expectedStoresGenerator: []storeGenerator{newPodStore}, + expected: true, }, { - name: "Language detection enabled", + name: "spot scheduling", cfg: map[string]interface{}{ - "cluster_agent.collect_kubernetes_tags": false, - "language_detection.reporting.enabled": true, - "language_detection.enabled": true, + "autoscaling.cluster.spot.enabled": true, }, - expectedStoresGenerator: []storeGenerator{newDeploymentStore}, + expected: true, }, { - name: "Language detection enabled", + name: "pod labels as tags", cfg: map[string]interface{}{ - "cluster_agent.collect_kubernetes_tags": false, - "language_detection.reporting.enabled": true, - "language_detection.enabled": false, + "kubernetes_pod_labels_as_tags": map[string]string{"app": "kube_app"}, }, - expectedStoresGenerator: []storeGenerator{}, + expected: true, }, { - name: "All configurations enabled", + name: "pod annotations as tags", + cfg: map[string]interface{}{ + "kubernetes_pod_annotations_as_tags": map[string]string{"team": "kube_team"}, + }, + expected: true, + }, + { + name: "workload autoscaling enabled", + cfg: map[string]interface{}{ + "autoscaling.workload.enabled": true, + }, + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cfg := config.NewMockWithOverrides(t, test.cfg) + assert.Equal(t, test.expected, shouldHavePodStore(cfg)) + }) + } +} + +func TestPodsRequiredAtStartup(t *testing.T) { + tests := []struct { + name string + cfg map[string]interface{} + expected bool + }{ + { + name: "no triggers", + cfg: map[string]interface{}{}, + expected: false, + }, + { + name: "kubernetes tags collection", cfg: map[string]interface{}{ "cluster_agent.collect_kubernetes_tags": true, - "language_detection.reporting.enabled": true, - "language_detection.enabled": true, }, - expectedStoresGenerator: []storeGenerator{newPodStore, newDeploymentStore}, + expected: true, + }, + { + name: "spot scheduling", + cfg: map[string]interface{}{ + "autoscaling.cluster.spot.enabled": true, + }, + expected: true, + }, + { + name: "pod labels as tags", + cfg: map[string]interface{}{ + "kubernetes_pod_labels_as_tags": map[string]string{"app": "kube_app"}, + }, + expected: true, + }, + { + name: "pod annotations as tags", + cfg: map[string]interface{}{ + "kubernetes_pod_annotations_as_tags": map[string]string{"team": "kube_team"}, + }, + expected: true, + }, + { + name: "workload autoscaling alone is not a startup-time requirement", + cfg: map[string]interface{}{ + "autoscaling.workload.enabled": true, + }, + expected: false, }, } - // Run test for each testcase for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg := config.NewMockWithOverrides(t, tt.cfg) - expectedStores := collectResultStoreGenerator(tt.expectedStoresGenerator, cfg) - stores := collectResultStoreGenerator(storeGenerators(cfg), cfg) - - assert.Equal(t, expectedStores, stores) + assert.Equal(t, tt.expected, podsRequiredAtStartup(cfg)) }) } } -func collectResultStoreGenerator(funcs []storeGenerator, config config.Reader) []*reflectorStore { - var stores []*reflectorStore - client := fakeclientset.NewClientset() - for _, f := range funcs { - _, s := f(nil, nil, config, client) - stores = append(stores, s) - } - return stores -} - func Test_metadataCollectionGVRs_WithFunctionalDiscovery(t *testing.T) { tests := []struct { name string @@ -567,3 +644,59 @@ func TestResourcesWithMetadataCollectionEnabled(t *testing.T) { }) } } + +func TestStartPodStoreOnGate(t *testing.T) { + testPodUID := "test-pod-uid" + + client := fakeclientset.NewClientset(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + UID: types.UID(testPodUID), + }, + }) + + gate := autoscalinggate.New() + + wmeta := fxutil.Test[workloadmetamock.Mock](t, fx.Options( + core.MockBundle(), + workloadmetafxmock.MockModule(workloadmeta.NewParams()), + )) + + testCollector := &collector{ + id: collectorID, + catalog: workloadmeta.ClusterAgent, + config: wmeta.GetConfig(), + autoscalingGate: gate, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + testCollector.startPodStoreOnGate(ctx, wmeta, client, newPodStoreWithTypedClient) + close(done) + }() + + // Before enabling the gate, no pod should appear in workloadmeta. + assert.Never(t, func() bool { + _, err := wmeta.GetKubernetesPod(testPodUID) + return err == nil + }, 100*time.Millisecond, 20*time.Millisecond) + + gate.Enable() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("startPodStoreOnGate did not return after gate enabled") + } + + // The pod reflector should eventually populate workloadmeta after enabling + // the gate. + require.Eventually(t, func() bool { + _, err := wmeta.GetKubernetesPod(testPodUID) + return err == nil + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_base.go b/pkg/clusteragent/admission/controllers/webhook/controller_base.go index 8ef43f183005..01c1460808b0 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_base.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_base.go @@ -43,7 +43,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/validate/datadoginstrumentation" "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/validate/kubernetesadmissionevents" clusterspot "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster/spot" - "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload" "github.com/DataDog/datadog-agent/pkg/clusteragent/instrumentation" kubecommon "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -56,11 +55,11 @@ type Controller interface { } // NewController returns the adequate implementation of the Controller interface. -func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, validatingInformers admissionregistration.Interface, mutatingInformers admissionregistration.Interface, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, pp workload.PodPatcher, sh clusterspot.PodHandler, datadogConfig config.Component, demultiplexer demultiplexer.Component, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) Controller { +func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, validatingInformers admissionregistration.Interface, mutatingInformers admissionregistration.Interface, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, sh clusterspot.PodHandler, datadogConfig config.Component, demultiplexer demultiplexer.Component, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) Controller { if config.useAdmissionV1() { - return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pp, sh, datadogConfig, demultiplexer, filterStore, handlers, informerFactory) + return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, sh, datadogConfig, demultiplexer, filterStore, handlers, informerFactory) } - return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pp, sh, datadogConfig, demultiplexer, filterStore, handlers, informerFactory) + return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, sh, datadogConfig, demultiplexer, filterStore, handlers, informerFactory) } // Webhook represents an admission webhook @@ -96,7 +95,7 @@ type Webhook interface { // The reason is that the volume mount for the APM socket added by the configWebhook webhook // doesn't always work on Fargate (one of the envs where we use an agent sidecar), and // the agent sidecar webhook needs to remove it. -func (c *controllerBase) generateWebhooks(datadogConfig config.Component, wmeta workloadmeta.Component, demultiplexer demultiplexer.Component, pp workload.PodPatcher, sh clusterspot.PodHandler, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) []Webhook { +func (c *controllerBase) generateWebhooks(datadogConfig config.Component, wmeta workloadmeta.Component, demultiplexer demultiplexer.Component, sh clusterspot.PodHandler, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) []Webhook { var webhooks []Webhook var validatingWebhooks []Webhook @@ -134,8 +133,10 @@ func (c *controllerBase) generateWebhooks(datadogConfig config.Component, wmeta agentsWebhook := agentsidecar.NewWebhook(datadogConfig) webhooks = append(webhooks, agentsWebhook) - // Setup autoscaling webhook. - autoscalingWebhook := autoscaling.NewWebhook(pp, datadogConfig) + // Setup autoscaling webhook. The pod patcher is set when the workload + // autoscaling stack starts on first DPA observation, until then the webhook + // is a no-op. + autoscalingWebhook := autoscaling.NewWebhook(datadogConfig) webhooks = append(webhooks, autoscalingWebhook) // Setup spot scheduling webhook. diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go index 787ab57fa956..a07755091da8 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go @@ -63,7 +63,6 @@ func TestNewController(t *testing.T) { getV1Cfg(t), wmeta, nil, - nil, datadogConfig, nil, newFilterStoreFromConfig(t, datadogConfig), @@ -84,7 +83,6 @@ func TestNewController(t *testing.T) { getV1beta1Cfg(t), wmeta, nil, - nil, datadogConfig, nil, newFilterStoreFromConfig(t, datadogConfig), diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1.go index a6571b67ff20..4c2c737cc9f7 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1.go @@ -31,7 +31,6 @@ import ( workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/common" clusterspot "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster/spot" - "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload" "github.com/DataDog/datadog-agent/pkg/clusteragent/instrumentation" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate" @@ -51,7 +50,7 @@ type ControllerV1 struct { } // NewControllerV1 returns a new Webhook Controller using admissionregistration/v1. -func NewControllerV1(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer, mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, pp workload.PodPatcher, sh clusterspot.PodHandler, datadogConfig config.Component, demultiplexer demultiplexer.Component, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) *ControllerV1 { +func NewControllerV1(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer, mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, sh clusterspot.PodHandler, datadogConfig config.Component, demultiplexer demultiplexer.Component, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) *ControllerV1 { controller := &ControllerV1{} controller.clientSet = client controller.config = config @@ -68,7 +67,7 @@ func NewControllerV1(client kubernetes.Interface, secretInformer coreinformers.S ) controller.isLeaderFunc = isLeaderFunc controller.leadershipStateNotif = leadershipStateNotif - controller.webhooks = controller.generateWebhooks(datadogConfig, wmeta, demultiplexer, pp, sh, filterStore, handlers, informerFactory) + controller.webhooks = controller.generateWebhooks(datadogConfig, wmeta, demultiplexer, sh, filterStore, handlers, informerFactory) controller.generateTemplates() if _, err := secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go index e237be130330..37c674859dc0 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go @@ -1020,7 +1020,7 @@ func TestGenerateTemplatesV1(t *testing.T) { c := &ControllerV1{} c.config = tt.configFunc(mockConfig) filterStore := newFilterStoreFromConfig(t, mockConfig) - c.webhooks = c.generateWebhooks(mockConfig, wmeta, nil, nil, nil, filterStore, nil, nil) + c.webhooks = c.generateWebhooks(mockConfig, wmeta, nil, nil, filterStore, nil, nil) c.generateTemplates() assert.EqualValues(t, tt.want(), c.mutatingWebhookTemplates) @@ -1263,7 +1263,6 @@ func (f *fixtureV1) createController() (*ControllerV1, informers.SharedInformerF getV1Cfg(f.t), wmeta, nil, - nil, datadogConfig, nil, newFilterStoreFromConfig(f.t, datadogConfig), diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go index f8af1dc35775..e2f1500d0127 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go @@ -32,7 +32,6 @@ import ( workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/common" clusterspot "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster/spot" - "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload" "github.com/DataDog/datadog-agent/pkg/clusteragent/instrumentation" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/certificate" @@ -52,7 +51,7 @@ type ControllerV1beta1 struct { } // NewControllerV1beta1 returns a new Webhook Controller using admissionregistration/v1beta1. -func NewControllerV1beta1(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer, mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, pp workload.PodPatcher, sh clusterspot.PodHandler, datadogConfig config.Component, demultiplexer demultiplexer.Component, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) *ControllerV1beta1 { +func NewControllerV1beta1(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer, mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, sh clusterspot.PodHandler, datadogConfig config.Component, demultiplexer demultiplexer.Component, filterStore workloadfilter.Component, handlers []instrumentation.Handler, informerFactory dynamicinformer.DynamicSharedInformerFactory) *ControllerV1beta1 { controller := &ControllerV1beta1{} controller.clientSet = client controller.config = config @@ -69,7 +68,7 @@ func NewControllerV1beta1(client kubernetes.Interface, secretInformer coreinform ) controller.isLeaderFunc = isLeaderFunc controller.leadershipStateNotif = leadershipStateNotif - controller.webhooks = controller.generateWebhooks(datadogConfig, wmeta, demultiplexer, pp, sh, filterStore, handlers, informerFactory) + controller.webhooks = controller.generateWebhooks(datadogConfig, wmeta, demultiplexer, sh, filterStore, handlers, informerFactory) controller.generateTemplates() if _, err := secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go index 0cf463e0bde3..9461452af26c 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go @@ -1014,7 +1014,7 @@ func TestGenerateTemplatesV1beta1(t *testing.T) { c := &ControllerV1beta1{} c.config = tt.configFunc(mockConfig) filterStore := newFilterStoreFromConfig(t, mockConfig) - c.webhooks = c.generateWebhooks(mockConfig, wmeta, nil, nil, nil, filterStore, nil, nil) + c.webhooks = c.generateWebhooks(mockConfig, wmeta, nil, nil, filterStore, nil, nil) c.generateTemplates() assert.EqualValues(t, tt.want(), c.mutatingWebhookTemplates) @@ -1257,7 +1257,6 @@ func (f *fixtureV1beta1) createController() (*ControllerV1beta1, informers.Share getV1beta1Cfg(f.t), wmeta, nil, - nil, datadogConfig, nil, newFilterStoreFromConfig(f.t, datadogConfig), diff --git a/pkg/clusteragent/admission/mutate/autoscaling/autoscaling.go b/pkg/clusteragent/admission/mutate/autoscaling/autoscaling.go index 7b10793694cd..5b0630b21964 100644 --- a/pkg/clusteragent/admission/mutate/autoscaling/autoscaling.go +++ b/pkg/clusteragent/admission/mutate/autoscaling/autoscaling.go @@ -9,6 +9,8 @@ package autoscaling import ( + "sync" + admiv1 "k8s.io/api/admission/v1" admissionregistrationv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" @@ -35,11 +37,17 @@ type Webhook struct { resources []common.WebhookResourceRule operations []admissionregistrationv1.OperationType matchConditions []admissionregistrationv1.MatchCondition - patcher workload.PodPatcher + + // the patcher is installed lazily via SetPatcher because the workload + // autoscaling stack starts lazily when the first DatadogPodAutoscaler is + // detected + patcherMutex sync.RWMutex + patcher workload.PodPatcher } -// NewWebhook returns a new Webhook -func NewWebhook(patcher workload.PodPatcher, datadogConfig config.Component) *Webhook { +// NewWebhook returns a new Webhook with no patcher installed. Call SetPatcher +// once the workload autoscaling stack is up. +func NewWebhook(datadogConfig config.Component) *Webhook { return &Webhook{ name: webhookName, isEnabled: datadogConfig.GetBool("autoscaling.workload.enabled"), @@ -47,10 +55,16 @@ func NewWebhook(patcher workload.PodPatcher, datadogConfig config.Component) *We resources: []common.WebhookResourceRule{{APIGroup: "", APIVersion: "v1", Resources: []string{"pods"}}}, operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Create}, matchConditions: []admissionregistrationv1.MatchCondition{}, - patcher: patcher, } } +// SetPatcher installs the PodPatcher used to apply recommendations. +func (w *Webhook) SetPatcher(p workload.PodPatcher) { + w.patcherMutex.Lock() + defer w.patcherMutex.Unlock() + w.patcher = p +} + // Name returns the name of the webhook func (w *Webhook) Name() string { return w.name @@ -105,11 +119,19 @@ func (w *Webhook) MatchConditions() []admissionregistrationv1.MatchCondition { // WebhookFunc returns the function that mutates the resources func (w *Webhook) WebhookFunc() admission.WebhookFunc { return func(request *admission.Request) *admiv1.AdmissionResponse { - return common.MutationResponse(mutatecommon.Mutate(request.Object, request.Namespace, w.Name(), w.updateResources, request.DynamicClient)) + w.patcherMutex.RLock() + p := w.patcher + w.patcherMutex.RUnlock() + if p == nil { + // No patcher means there's nothing to do. Skip parsing the pod to + // avoid the cost of unmarshalling and re-marshalling. + return &admiv1.AdmissionResponse{Allowed: true} + } + + mutator := func(pod *corev1.Pod, _ string, _ dynamic.Interface) (bool, error) { + return p.ApplyRecommendations(pod) + } + + return common.MutationResponse(mutatecommon.Mutate(request.Object, request.Namespace, w.Name(), mutator, request.DynamicClient)) } } - -// updateResources finds the owner of a pod, calls the recommender to retrieve the recommended CPU and Memory requests -func (w *Webhook) updateResources(pod *corev1.Pod, _ string, _ dynamic.Interface) (bool, error) { - return w.patcher.ApplyRecommendations(pod) -} diff --git a/pkg/clusteragent/admission/start.go b/pkg/clusteragent/admission/start.go index a3ed3067b26b..dc1d2173f1a8 100644 --- a/pkg/clusteragent/admission/start.go +++ b/pkg/clusteragent/admission/start.go @@ -21,7 +21,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/controllers/webhook" admprobe "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/probe" clusterspot "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster/spot" - "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload" "github.com/DataDog/datadog-agent/pkg/clusteragent/instrumentation" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common/namespace" @@ -50,7 +49,7 @@ type ControllerContext struct { } // StartControllers starts the secret and webhook controllers -func StartControllers(ctx ControllerContext, datadogConfig config.Component, wmeta workloadmeta.Component, pp workload.PodPatcher, sh clusterspot.PodHandler, healthPlatform option.Option[healthplatformdef.Component]) ([]webhook.Webhook, error) { +func StartControllers(ctx ControllerContext, datadogConfig config.Component, wmeta workloadmeta.Component, sh clusterspot.PodHandler, healthPlatform option.Option[healthplatformdef.Component]) ([]webhook.Webhook, error) { var webhooks []webhook.Webhook if !datadogConfig.GetBool("admission_controller.enabled") { @@ -104,7 +103,6 @@ func StartControllers(ctx ControllerContext, datadogConfig config.Component, wme notifChanWebhook, webhookConfig, wmeta, - pp, sh, datadogConfig, ctx.Demultiplexer, diff --git a/pkg/clusteragent/autoscaling/autoscalinggate/BUILD.bazel b/pkg/clusteragent/autoscaling/autoscalinggate/BUILD.bazel new file mode 100644 index 000000000000..0c68e2f8dd86 --- /dev/null +++ b/pkg/clusteragent/autoscaling/autoscalinggate/BUILD.bazel @@ -0,0 +1,16 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "autoscalinggate", + srcs = ["gate.go"], + importpath = "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate", + visibility = ["//visibility:public"], +) + +go_test( + name = "autoscalinggate_test", + srcs = ["gate_test.go"], + embed = [":autoscalinggate"], + gotags = ["test"], + deps = ["@com_github_stretchr_testify//assert"], +) diff --git a/pkg/clusteragent/autoscaling/autoscalinggate/gate.go b/pkg/clusteragent/autoscaling/autoscalinggate/gate.go new file mode 100644 index 000000000000..f12773df0689 --- /dev/null +++ b/pkg/clusteragent/autoscaling/autoscalinggate/gate.go @@ -0,0 +1,77 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +// Package autoscalinggate provides a gate used to coordinate the lazy startup +// of the workload autoscaling stack. +// +// The goal of this gate is to enable autoscaling by default to make onboarding +// easier, but we don't want to use any resources for users that are not using +// autoscaling. Some of the dependencies of autoscaling can use a lot of +// memory. In particular, the pod collection in workloadmeta can use a lot of +// memory in large clusters. +// +// To achieve the goal, we use this gate to only start the autoscaling stack +// when there's at least one DatadogPodAutoscaler or a workload or namespace +// with an autoscaling label. +package autoscalinggate + +import ( + "context" + "sync" +) + +// Gate coordinates the lazy startup of the workload autoscaling stack. +type Gate struct { + enableCh chan struct{} + enableOnce sync.Once + syncedCh chan struct{} + syncedOnce sync.Once +} + +// New returns a new Gate. +func New() *Gate { + return &Gate{ + enableCh: make(chan struct{}), + syncedCh: make(chan struct{}), + } +} + +// Enable marks autoscaling as enabled. To be called when there's a +// DatadogPodAutoscaler or a workload or namespace with autoscaling labels. Only +// the first call has any effect. +func (g *Gate) Enable() { + g.enableOnce.Do(func() { close(g.enableCh) }) +} + +// MarkPodCollectionSynced marks the pod collection as synced. Only the first +// call has any effect. +func (g *Gate) MarkPodCollectionSynced() { + g.syncedOnce.Do(func() { close(g.syncedCh) }) +} + +// WaitForEnable blocks until Enable is called or ctx is cancelled. Returns true +// if Enable was called, false if ctx was cancelled first. +func (g *Gate) WaitForEnable(ctx context.Context) bool { + select { + case <-g.enableCh: + return true + case <-ctx.Done(): + return false + } +} + +// WaitForPodCollectionSynced blocks until MarkPodCollectionSynced is called or +// ctx is cancelled. Returns true if MarkPodCollectionSynced was called, false +// if ctx was cancelled first. +func (g *Gate) WaitForPodCollectionSynced(ctx context.Context) bool { + select { + case <-g.syncedCh: + return true + case <-ctx.Done(): + return false + } +} diff --git a/pkg/clusteragent/autoscaling/autoscalinggate/gate_test.go b/pkg/clusteragent/autoscaling/autoscalinggate/gate_test.go new file mode 100644 index 000000000000..0aaf86ac55f0 --- /dev/null +++ b/pkg/clusteragent/autoscaling/autoscalinggate/gate_test.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package autoscalinggate + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const testTimeout = time.Second + +func TestEnable(t *testing.T) { + gate := New() + + gate.Enable() + gate.Enable() // can be called multiple times safely + + assert.True(t, gate.WaitForEnable(context.TODO())) +} + +func TestMarkPodCollectionSynced(t *testing.T) { + gate := New() + + gate.MarkPodCollectionSynced() + gate.MarkPodCollectionSynced() // can be called multiple times safely + + assert.True(t, gate.WaitForPodCollectionSynced(context.TODO())) +} + +func TestWaitForEnable(t *testing.T) { + t.Run("unblocks on Enable", func(t *testing.T) { + gate := New() + + done := make(chan bool, 1) + go func() { + done <- gate.WaitForEnable(context.TODO()) + }() + + gate.Enable() + + select { + case result := <-done: + assert.True(t, result) + case <-time.After(testTimeout): + t.Fatal("WaitForEnable did not unblock after Enable") + } + }) + + t.Run("returns false on cancelled context", func(t *testing.T) { + gate := New() + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + assert.False(t, gate.WaitForEnable(ctx)) + }) +} + +func TestWaitForPodCollectionSynced(t *testing.T) { + t.Run("unblocks on MarkPodCollectionSynced", func(t *testing.T) { + gate := New() + + done := make(chan bool, 1) + go func() { + done <- gate.WaitForPodCollectionSynced(context.TODO()) + }() + + gate.MarkPodCollectionSynced() + + select { + case result := <-done: + assert.True(t, result) + case <-time.After(testTimeout): + t.Fatal("WaitForPodCollectionSynced did not unblock after MarkPodCollectionSynced") + } + }) + + t.Run("returns false on cancelled context", func(t *testing.T) { + gate := New() + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + assert.False(t, gate.WaitForPodCollectionSynced(ctx)) + }) +} diff --git a/pkg/clusteragent/autoscaling/workload/controller.go b/pkg/clusteragent/autoscaling/workload/controller.go index 7a7eb83c541f..69b940f381a3 100644 --- a/pkg/clusteragent/autoscaling/workload/controller.go +++ b/pkg/clusteragent/autoscaling/workload/controller.go @@ -50,13 +50,13 @@ const ( defaultStaleTimestampThreshold = 30 * time.Minute // time to wait before considering a recommendation stale ) -var ( - podAutoscalerGVR = datadoghq.GroupVersion.WithResource("datadogpodautoscalers") - podAutoscalerMeta = metav1.TypeMeta{ - Kind: "DatadogPodAutoscaler", - APIVersion: "datadoghq.com/v1alpha2", - } -) +// PodAutoscalerGVR identifies the DatadogPodAutoscaler resource. +var PodAutoscalerGVR = datadoghq.GroupVersion.WithResource("datadogpodautoscalers") + +var podAutoscalerMeta = metav1.TypeMeta{ + Kind: "DatadogPodAutoscaler", + APIVersion: "datadoghq.com/v1alpha2", +} type ( store = autoscaling.Store[model.PodAutoscalerInternal] @@ -120,7 +120,7 @@ func NewController( }, ) - baseController, err := autoscaling.NewController(controllerID, c, dynamicClient, dynamicInformer, podAutoscalerGVR, isLeader, store, autoscalingWorkqueue) + baseController, err := autoscaling.NewController(controllerID, c, dynamicClient, dynamicInformer, PodAutoscalerGVR, isLeader, store, autoscalingWorkqueue) if err != nil { return nil, err } @@ -157,7 +157,7 @@ func NewController( // PreStart is called before the controller starts func (c *Controller) PreStart(ctx context.Context) { - autoscaling.StartLocalTelemetry(ctx, c.localSender, "workload", []string{"kube_cluster_id:" + c.clusterID, "crd_api_version:" + podAutoscalerGVR.Version}) + autoscaling.StartLocalTelemetry(ctx, c.localSender, "workload", []string{"kube_cluster_id:" + c.clusterID, "crd_api_version:" + PodAutoscalerGVR.Version}) // Start periodic metrics submission (every 30 seconds) go c.metricsStore.WriteAllPeriodically(ctx, 30*time.Second) @@ -510,7 +510,7 @@ func (c *Controller) createPodAutoscaler(ctx context.Context, podAutoscalerInter return 0, time.Time{}, err } - createdObj, err := c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).Create(ctx, obj, metav1.CreateOptions{}) + createdObj, err := c.Client.Resource(PodAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).Create(ctx, obj, metav1.CreateOptions{}) if err != nil { return 0, time.Time{}, fmt.Errorf("Unable to create PodAutoscaler: %s/%s, err: %v", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name(), err) } @@ -554,7 +554,7 @@ func (c *Controller) updatePodAutoscalerSpec(ctx context.Context, podAutoscalerI return 0, err } - updatedObj, err := c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).Update(ctx, obj, metav1.UpdateOptions{}) + updatedObj, err := c.Client.Resource(PodAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).Update(ctx, obj, metav1.UpdateOptions{}) if err != nil { return 0, fmt.Errorf("Unable to update PodAutoscaler Spec: %s/%s, err: %w", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name(), err) } @@ -581,7 +581,7 @@ func (c *Controller) updatePodAutoscalerStatus(ctx context.Context, podAutoscale return err } - _, err = c.Client.Resource(podAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).UpdateStatus(ctx, obj, metav1.UpdateOptions{}) + _, err = c.Client.Resource(PodAutoscalerGVR).Namespace(podAutoscalerInternal.Namespace()).UpdateStatus(ctx, obj, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("Unable to update PodAutoscaler Status: %s/%s, err: %w", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name(), err) } @@ -591,7 +591,7 @@ func (c *Controller) updatePodAutoscalerStatus(ctx context.Context, podAutoscale func (c *Controller) deletePodAutoscaler(ns, name string) error { log.Infof("Deleting PodAutoscaler: %s/%s", ns, name) - err := c.Client.Resource(podAutoscalerGVR).Namespace(ns).Delete(context.TODO(), name, metav1.DeleteOptions{}) + err := c.Client.Resource(PodAutoscalerGVR).Namespace(ns).Delete(context.TODO(), name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("Unable to delete PodAutoscaler: %s/%s, err: %v", ns, name, err) } diff --git a/pkg/clusteragent/autoscaling/workload/controller_test.go b/pkg/clusteragent/autoscaling/workload/controller_test.go index 5fd7759a5961..07b9fbbc116d 100755 --- a/pkg/clusteragent/autoscaling/workload/controller_test.go +++ b/pkg/clusteragent/autoscaling/workload/controller_test.go @@ -64,7 +64,7 @@ func newFixture(t *testing.T, testTime time.Time) *fixture { podWatcher := newFakePodWatcher() return &fixture{ ControllerFixture: autoscaling.NewFixture( - t, podAutoscalerGVR, + t, PodAutoscalerGVR, func(fakeClient *fake.FakeDynamicClient, informer dynamicinformer.DynamicSharedInformerFactory, isLeader func() bool) (*autoscaling.Controller, error) { c, err := NewController(clock, "cluster-id1", recorder, nil, nil, nil, fakeClient, informer, isLeader, store, podWatcher, nil, hashHeap, nil) if err != nil { diff --git a/pkg/clusteragent/autoscaling/workload/provider/lazy_start.go b/pkg/clusteragent/autoscaling/workload/provider/lazy_start.go new file mode 100644 index 000000000000..47336dde131c --- /dev/null +++ b/pkg/clusteragent/autoscaling/workload/provider/lazy_start.go @@ -0,0 +1,117 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package provider + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" + "k8s.io/client-go/tools/cache" + + tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def" + workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" + "github.com/DataDog/datadog-agent/pkg/aggregator/sender" + "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/autoscaling" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/profile" + "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +var namespaceGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"} + +// RegisterAutoscalingGateHandlers installs informer event handlers that enable +// the gate on the first observation of: +// - a DatadogPodAutoscaler resource +// - a supported workload with the autoscaling profile label +// - a namespace with the autoscaling profile label +// +// DatadogPodAutoscalerClusterProfile is not a trigger because the autoscaling +// stack creates some OOTB profiles itself, which would make this always +// trigger once the stack has run. +func RegisterAutoscalingGateHandlers( + ctx context.Context, + dynamicInformer dynamicinformer.DynamicSharedInformerFactory, + metadataClient metadata.Interface, + workloadResources []profile.GroupVersionKindResource, + gate *autoscalinggate.Gate, +) error { + enable := func(_ any) { gate.Enable() } + handlers := cache.ResourceEventHandlerFuncs{AddFunc: enable} + + // DPA trigger + if _, err := dynamicInformer.ForResource(workload.PodAutoscalerGVR).Informer().AddEventHandler(handlers); err != nil { + return fmt.Errorf("cannot add gate handler to DatadogPodAutoscaler informer: %w", err) + } + + // Workload and namespace triggers + labelFilteredFactory := metadatainformer.NewFilteredSharedInformerFactory( + metadataClient, + 0, + metav1.NamespaceAll, + func(opts *metav1.ListOptions) { + opts.LabelSelector = model.ProfileLabelKey + }, + ) + + for _, resource := range workloadResources { + if _, err := labelFilteredFactory.ForResource(resource.GroupVersionResource).Informer().AddEventHandler(handlers); err != nil { + return fmt.Errorf("cannot add gate handler to %s informer: %w", resource.GroupVersionResource, err) + } + } + + if _, err := labelFilteredFactory.ForResource(namespaceGVR).Informer().AddEventHandler(handlers); err != nil { + return fmt.Errorf("cannot add gate handler to namespaces informer: %w", err) + } + + labelFilteredFactory.Start(ctx.Done()) + + return nil +} + +// StartWorkloadAutoscalingOnGate waits for the autoscaling gate to be enabled +// before starting the workload autoscaling stack. +func StartWorkloadAutoscalingOnGate( + ctx context.Context, + gate *autoscalinggate.Gate, + clusterID, clusterName string, + isLeader func() bool, + apiCl *apiserver.APIClient, + rcClient workload.RcClient, + wlm workloadmeta.Component, + taggerComp tagger.Component, + senderManager sender.SenderManager, + webhook *autoscaling.Webhook, +) { + if !gate.WaitForEnable(ctx) || ctx.Err() != nil { + return + } + + if !gate.WaitForPodCollectionSynced(ctx) || ctx.Err() != nil { + return + } + + log.Info("Workload autoscaling gate synced, starting autoscaling stack") + + patcher, err := StartWorkloadAutoscaling(ctx, clusterID, clusterName, isLeader, apiCl, rcClient, wlm, taggerComp, senderManager) + if err != nil { + log.Errorf("Failed to start workload autoscaling stack: %v", err) + return + } + + if webhook != nil { + webhook.SetPatcher(patcher) + } +} diff --git a/pkg/clusteragent/autoscaling/workload/provider/lazy_start_test.go b/pkg/clusteragent/autoscaling/workload/provider/lazy_start_test.go new file mode 100644 index 000000000000..987517c90a26 --- /dev/null +++ b/pkg/clusteragent/autoscaling/workload/provider/lazy_start_test.go @@ -0,0 +1,185 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package provider + +import ( + "context" + "testing" + "time" + + datadoghqcommon "github.com/DataDog/datadog-operator/api/datadoghq/common" + datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic/dynamicinformer" + dynamicfake "k8s.io/client-go/dynamic/fake" + kubefake "k8s.io/client-go/kubernetes/fake" + kscheme "k8s.io/client-go/kubernetes/scheme" + metadatafake "k8s.io/client-go/metadata/fake" + + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate" + "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model" +) + +const ( + enableTimeout = 2 * time.Second + noEnableTimeout = 50 * time.Millisecond // Shorter because we wait for the full duration +) + +func TestRegisterAutoscalingGateHandlers(t *testing.T) { + scheme := runtime.NewScheme() + err := metav1.AddMetaToScheme(scheme) + require.NoError(t, err) + + discoveryCl := kubefake.NewSimpleClientset().Discovery() + + tests := []struct { + name string + kubernetesObjects []runtime.Object + kubernetesMetadataObjects []runtime.Object + expectedEnable bool + }{ + { + name: "enables gate on DPA", + kubernetesObjects: []runtime.Object{ + &datadoghq.DatadogPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + Kind: "DatadogPodAutoscaler", + APIVersion: "datadoghq.com/v1alpha2", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "dpa-0", + }, + Spec: datadoghq.DatadogPodAutoscalerSpec{ + TargetRef: autoscalingv2.CrossVersionObjectReference{ + Kind: "Deployment", + Name: "app", + APIVersion: "apps/v1", + }, + Owner: datadoghqcommon.DatadogPodAutoscalerLocalOwner, + }, + }, + }, + expectedEnable: true, + }, + { + name: "does not enable on DPAClusterProfile", + kubernetesObjects: []runtime.Object{ + &datadoghq.DatadogPodAutoscalerClusterProfile{ + TypeMeta: metav1.TypeMeta{ + Kind: "DatadogPodAutoscalerClusterProfile", + APIVersion: "datadoghq.com/v1alpha2", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "profile-0", + }, + }, + }, + expectedEnable: false, + }, + { + name: "enables gate on labeled Deployment", + kubernetesMetadataObjects: []runtime.Object{ + &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: appsv1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "app", + Labels: map[string]string{model.ProfileLabelKey: "high-cpu"}, + }, + }, + }, + expectedEnable: true, + }, + { + name: "enables gate on labeled StatefulSet", + kubernetesMetadataObjects: []runtime.Object{ + &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: appsv1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "app", + Labels: map[string]string{model.ProfileLabelKey: "high-cpu"}, + }, + }, + }, + expectedEnable: true, + }, + { + name: "enables gate on labeled Namespace", + kubernetesMetadataObjects: []runtime.Object{ + &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: corev1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ns-prod", + Labels: map[string]string{model.ProfileLabelKey: "high-cpu"}, + }, + }, + }, + expectedEnable: true, + }, + { + name: "does not enable on unlabeled Deployment", + kubernetesMetadataObjects: []runtime.Object{ + &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: appsv1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "app", + // No profile label + }, + }, + }, + expectedEnable: false, + }, + { + name: "does not enable without resources", + expectedEnable: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dynamicClient := dynamicfake.NewSimpleDynamicClient(kscheme.Scheme, test.kubernetesObjects...) + dynamicInformer := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) + metadataClient := metadatafake.NewSimpleMetadataClient(scheme, test.kubernetesMetadataObjects...) + + timeout := noEnableTimeout + if test.expectedEnable { + timeout = enableTimeout + } + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + workloadResources := SupportedWorkloadResources(discoveryCl) + gate := autoscalinggate.New() + require.NoError(t, RegisterAutoscalingGateHandlers(ctx, dynamicInformer, metadataClient, workloadResources, gate)) + + dynamicInformer.Start(ctx.Done()) + assert.Equal(t, test.expectedEnable, gate.WaitForEnable(ctx)) + }) + } +} diff --git a/pkg/clusteragent/autoscaling/workload/provider/provider.go b/pkg/clusteragent/autoscaling/workload/provider/provider.go index 48e59ecb63c3..cb45e04d3f90 100644 --- a/pkg/clusteragent/autoscaling/workload/provider/provider.go +++ b/pkg/clusteragent/autoscaling/workload/provider/provider.go @@ -39,6 +39,8 @@ import ( "k8s.io/client-go/discovery" ) +var argoRolloutsGVR = schema.GroupVersionResource{Group: "argoproj.io", Version: "v1alpha1", Resource: "rollouts"} + // StartWorkloadAutoscaling starts the workload autoscaling controller func StartWorkloadAutoscaling( ctx context.Context, @@ -108,16 +110,12 @@ func StartWorkloadAutoscaling( return nil, fmt.Errorf("Unable to start profile controller: %w", err) } - workloadResources := []profile.GroupVersionKindResource{ - {GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, Kind: "Deployment"}, - {GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}, Kind: "StatefulSet"}, - } - if isArgoRolloutsAvailable(apiCl.Cl.Discovery()) { - workloadResources = append(workloadResources, profile.GroupVersionKindResource{ - GroupVersionResource: schema.GroupVersionResource{Group: "argoproj.io", Version: "v1alpha1", Resource: "rollouts"}, - Kind: kubernetes.RolloutKind, - }) - log.Info("Argo Rollouts CRD detected, enabling rollout support for autoscaling profiles") + workloadResources := SupportedWorkloadResources(apiCl.Cl.Discovery()) + for _, workloadResource := range workloadResources { + if workloadResource.GroupVersionResource == argoRolloutsGVR { + log.Info("Argo Rollouts CRD detected, enabling rollout support for autoscaling profiles") + break + } } workloadWatcher := profile.NewWorkloadWatcher( @@ -160,6 +158,24 @@ func StartWorkloadAutoscaling( return podPatcher, nil } +// SupportedWorkloadResources returns the list of workload resources that +// autoscaling profiles can target. +func SupportedWorkloadResources(discoveryClient discovery.DiscoveryInterface) []profile.GroupVersionKindResource { + resources := []profile.GroupVersionKindResource{ + {GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, Kind: "Deployment"}, + {GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}, Kind: "StatefulSet"}, + } + + if isArgoRolloutsAvailable(discoveryClient) { + resources = append(resources, profile.GroupVersionKindResource{ + GroupVersionResource: argoRolloutsGVR, + Kind: kubernetes.RolloutKind, + }) + } + + return resources +} + func isArgoRolloutsAvailable(discoveryClient discovery.DiscoveryInterface) bool { resources, err := discoveryClient.ServerResourcesForGroupVersion(kubernetes.RolloutAPIVersion) if err != nil {