Skip to content
Open
9 changes: 8 additions & 1 deletion cmd/cluster-agent/subcommands/check/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/DataDog/datadog-agent/cmd/cluster-agent/command"
wmcatalog "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/catalog-clusteragent"
"github.com/DataDog/datadog-agent/pkg/cli/subcommands/check"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
pkgcommon "github.com/DataDog/datadog-agent/pkg/util/common"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"

"github.com/spf13/cobra"
"go.uber.org/fx"
)

// Commands returns a slice of subcommands for the 'cluster-agent' command.
Expand All @@ -33,7 +35,12 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
ConfigName: command.ConfigName,
LoggerName: command.LoggerName,
}
}, wmcatalog.GetCatalog())
}, fx.Options(
wmcatalog.GetCatalog(),
// Required by the kubeapiserver collector, but never enabled in the
// "check" command because autoscaling doesn't run.
fx.Supply(autoscalinggate.New()),
))

return []*cobra.Command{cmd}
}
45 changes: 37 additions & 8 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ import (
metricscompressionfx "github.com/DataDog/datadog-agent/comp/serializer/metricscompression/fx"
"github.com/DataDog/datadog-agent/pkg/clusteragent"
admissionpkg "github.com/DataDog/datadog-agent/pkg/clusteragent/admission"
"github.com/DataDog/datadog-agent/pkg/clusteragent/admission/controllers/webhook"
admissionautoscaling "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/mutate/autoscaling"
admissionpatch "github.com/DataDog/datadog-agent/pkg/clusteragent/admission/patch"
apidca "github.com/DataDog/datadog-agent/pkg/clusteragent/api"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster"
clusterspot "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/cluster/spot"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/provider"
pkgclusterchecks "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks"
instrumentationhandlers "github.com/DataDog/datadog-agent/pkg/clusteragent/instrumentation/handlers"
Expand Down Expand Up @@ -161,6 +163,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
// `fxutil.OneShot`.
return fxutil.OneShot(start,
fx.Supply(globalParams),
fx.Supply(autoscalinggate.New()),
fx.Supply(core.BundleParams{
ConfigParams: config.NewClusterAgentParams(globalParams.ConfFilePath, config.WithExtraConfFiles(globalParams.ExtraConfFilePath)),
LogParams: log.ForDaemon(command.LoggerName, "log_file", defaultpaths.DCALogFile),
Expand Down Expand Up @@ -289,6 +292,7 @@ func start(log log.Component,
tracerouteComp traceroute.Component,
eventPlatform eventplatform.Component,
healthPlatform option.Option[healthplatformdef.Component],
autoscalingGate *autoscalinggate.Gate,
) error {
stopCh := make(chan struct{})
validatingStopCh := make(chan struct{})
Expand Down Expand Up @@ -548,7 +552,6 @@ func start(log log.Component,
}

// Autoscaling Product
var pp workload.PodPatcher
if config.GetBool("autoscaling.workload.enabled") {
if rcClient == nil {
return errors.New("Remote config is disabled or failed to initialize, remote config is a required dependency for autoscaling")
Expand All @@ -558,11 +561,14 @@ func start(log log.Component,
log.Error("Admission controller is disabled, vertical autoscaling requires the admission controller to be enabled. Vertical scaling will be disabled.")
}

if patcher, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, taggerComp, demultiplexer); err == nil {
pp = patcher
} else {
return fmt.Errorf("Error while starting workload autoscaling: %v", err)
// Register the gate handlers that allow the lazy startup of the
// autoscaling stack. It stays off until a DatadogPodAutoscaler or a
// workload/namespace with the autoscaling profile label is observed.
workloadResources := provider.SupportedWorkloadResources(apiCl.Cl.Discovery())
if err := provider.RegisterAutoscalingGateHandlers(mainCtx, apiCl.DynamicInformerFactory, apiCl.MetadataInformerCl, workloadResources, autoscalingGate); err != nil {
return fmt.Errorf("Error while registering autoscaling gate handlers: %w", err)
}
apiCl.DynamicInformerFactory.Start(mainCtx.Done())
}

if config.GetBool("autoscaling.cluster.enabled") {
Expand Down Expand Up @@ -634,7 +640,10 @@ func start(log log.Component,
}
}

if config.GetBool("admission_controller.enabled") {
var autoscalingWebhook *admissionautoscaling.Webhook
admissionEnabled := config.GetBool("admission_controller.enabled")

if admissionEnabled {
if config.GetBool("admission_controller.auto_instrumentation.patcher.enabled") {
patchCtx := admissionpatch.ControllerContext{
LeadershipStateSubscribeFunc: le.Subscribe,
Expand Down Expand Up @@ -666,7 +675,7 @@ func start(log log.Component,
InstrumentationHandlers: instrHandlers,
}

webhooks, err := admissionpkg.StartControllers(admissionCtx, datadogConfig, wmeta, pp, sh, healthPlatform)
webhooks, err := admissionpkg.StartControllers(admissionCtx, datadogConfig, wmeta, sh, healthPlatform)
// Ignore the error if it's related to the validatingwebhookconfigurations.
var syncInformerError *apiserver.SyncInformersError
if err != nil && !(errors.As(err, &syncInformerError) && syncInformerError.Name == apiserver.ValidatingWebhooksInformer) {
Expand Down Expand Up @@ -696,11 +705,22 @@ func start(log log.Component,
pkglog.Errorf("Error in the Admission Controller Webhook Server: %v", errServ)
}
}()

autoscalingWebhook = findAutoscalingWebhook(webhooks)
}
} else {
pkglog.Info("Admission controller is disabled")
}

// Some autoscaling features don't require the admission webhook, so start
// the autoscaling stack even without it.
if config.GetBool("autoscaling.workload.enabled") {
if admissionEnabled && autoscalingWebhook == nil {
pkglog.Errorf("Autoscaling webhook not available. Vertical autoscaling will not apply recommendations to new pods")
}
go provider.StartWorkloadAutoscalingOnGate(mainCtx, autoscalingGate, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, taggerComp, demultiplexer, autoscalingWebhook)
}

if config.GetBool("cluster_agent.mcp.enabled") {
// Get MCP configured endpoint
mcpEndpoint := config.GetString("cluster_agent.mcp.endpoint")
Expand Down Expand Up @@ -753,6 +773,15 @@ func start(log log.Component,
return nil
}

func findAutoscalingWebhook(webhooks []webhook.Webhook) *admissionautoscaling.Webhook {
for _, hook := range webhooks {
if autoscalingWebhook, ok := hook.(*admissionautoscaling.Webhook); ok {
return autoscalingWebhook
}
}
return nil
}

func setupClusterCheck(ctx context.Context, ac autodiscovery.Component, tagger tagger.Component) (*pkgclusterchecks.Handler, error) {
handler, err := pkgclusterchecks.NewHandler(ac, tagger)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"strings"
"time"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common"
"go.uber.org/fx"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
Expand All @@ -22,9 +21,11 @@ import (

"github.com/DataDog/datadog-agent/comp/core/config"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/autoscalinggate"
configutils "github.com/DataDog/datadog-agent/pkg/config/utils"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand All @@ -37,18 +38,26 @@ const (
type dependencies struct {
fx.In

Config config.Component
Config config.Component
AutoscalingGate *autoscalinggate.Gate
}

// storeGenerator returns a new store specific to a given resource
type storeGenerator func(context.Context, workloadmeta.Component, config.Reader, kubernetes.Interface) (*cache.Reflector, *reflectorStore)

func shouldHavePodStore(cfg config.Reader) bool {
metadataAsTags := configutils.GetMetadataAsTags(cfg)
hasPodLabelsAsTags := len(metadataAsTags.GetPodLabelsAsTags()) > 0
hasPodAnnotationsAsTags := len(metadataAsTags.GetPodAnnotationsAsTags()) > 0
return podsRequiredAtStartup(cfg) || cfg.GetBool("autoscaling.workload.enabled")
}

return cfg.GetBool("cluster_agent.collect_kubernetes_tags") || cfg.GetBool("autoscaling.workload.enabled") || cfg.GetBool("autoscaling.cluster.spot.enabled") || hasPodLabelsAsTags || hasPodAnnotationsAsTags
// podsRequiredAtStartup returns true when some feature needs the pod store to
// be available from startup. Workload autoscaling does not. It can defer the
// start via the autoscaling gate.
func podsRequiredAtStartup(cfg config.Reader) bool {
metadataAsTags := configutils.GetMetadataAsTags(cfg)
return cfg.GetBool("cluster_agent.collect_kubernetes_tags") ||
cfg.GetBool("autoscaling.cluster.spot.enabled") ||
len(metadataAsTags.GetPodLabelsAsTags()) > 0 ||
len(metadataAsTags.GetPodAnnotationsAsTags()) > 0
}

func shouldHaveDeploymentStore(cfg config.Reader) bool {
Expand All @@ -59,20 +68,6 @@ func shouldHaveDeploymentStore(cfg config.Reader) bool {
return cfg.GetBool("language_detection.enabled") && cfg.GetBool("language_detection.reporting.enabled") || hasDeploymentsLabelsAsTags || hasDeploymentsAnnotationsAsTags
}

func storeGenerators(cfg config.Reader) []storeGenerator {
var generators []storeGenerator

if shouldHavePodStore(cfg) {
generators = append(generators, newPodStore)
}

if shouldHaveDeploymentStore(cfg) {
generators = append(generators, newDeploymentStore)
}

return generators
}

func metadataCollectionGVRs(cfg config.Reader, discoveryClient discovery.DiscoveryInterface) ([]schema.GroupVersionResource, error) {
return getGVRsForRequestedResources(discoveryClient, resourcesWithMetadataCollectionEnabled(cfg))
}
Expand Down Expand Up @@ -170,18 +165,20 @@ func resourcesForAPMConfig(cfg config.Reader) []string {
}

type collector struct {
id string
catalog workloadmeta.AgentType
config config.Reader
id string
catalog workloadmeta.AgentType
config config.Reader
autoscalingGate *autoscalinggate.Gate
}

// NewCollector returns a kubeapiserver CollectorProvider that instantiates its colletor
func NewCollector(deps dependencies) (workloadmeta.CollectorProvider, error) {
return workloadmeta.CollectorProvider{
Collector: &collector{
id: collectorID,
catalog: workloadmeta.ClusterAgent,
config: deps.Config,
id: collectorID,
catalog: workloadmeta.ClusterAgent,
config: deps.Config,
autoscalingGate: deps.AutoscalingGate,
},
}, nil
}
Expand Down Expand Up @@ -218,8 +215,26 @@ func (c *collector) Start(ctx context.Context, wlmetaStore workloadmeta.Componen
}
}

for _, storeBuilder := range storeGenerators(c.config) {
reflector, store := storeBuilder(ctx, wlmetaStore, c.config, client)
if shouldHavePodStore(c.config) {
autoscalingEnabled := c.config.GetBool("autoscaling.workload.enabled")
lazyStart := !podsRequiredAtStartup(c.config) && autoscalingEnabled

if lazyStart {
// The store is intentionally not added to objectStores. It would
// block the startup readiness check.
go c.startPodStoreOnGate(ctx, wlmetaStore, client, newPodStore)
} else {
reflector, store := newPodStore(ctx, wlmetaStore, c.config, client)
objectStores = append(objectStores, store)
go reflector.Run(ctx.Done())
if autoscalingEnabled {
go c.markPodCollectionSyncedWhenReady(ctx, store)
}
}
}

if shouldHaveDeploymentStore(c.config) {
reflector, store := newDeploymentStore(ctx, wlmetaStore, c.config, client)
objectStores = append(objectStores, store)
go reflector.Run(ctx.Done())
}
Expand All @@ -241,6 +256,30 @@ func (c *collector) Start(ctx context.Context, wlmetaStore workloadmeta.Componen
return nil
}

// startPodStoreOnGate blocks until the autoscaling gate is enabled or the
// context is cancelled. On gate enable, it starts the pod reflector.
func (c *collector) startPodStoreOnGate(ctx context.Context, wlmetaStore workloadmeta.Component, client kubernetes.Interface, newStore storeGenerator) {
if !c.autoscalingGate.WaitForEnable(ctx) {
return
}

log.Debug("Autoscaling gate enabled, starting workloadmeta pod reflector lazily")
reflector, store := newStore(ctx, wlmetaStore, c.config, client)
go reflector.Run(ctx.Done())

c.markPodCollectionSyncedWhenReady(ctx, store)
}

// markPodCollectionSyncedWhenReady waits for the pod store's cache to sync
// then signals the autoscaling gate. The workload autoscaling stack waits on
// this signal before starting.
func (c *collector) markPodCollectionSyncedWhenReady(ctx context.Context, store *reflectorStore) {
if !cache.WaitForCacheSync(ctx.Done(), store.HasSynced) {
return
}
c.autoscalingGate.MarkPodCollectionSynced()
}

func (c *collector) Pull(_ context.Context) error {
return nil
}
Expand Down
Loading
Loading