Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions vertical-pod-autoscaler/pkg/admission-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/recommendation"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/vpa"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
vpa_informers "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
Expand Down Expand Up @@ -99,10 +100,12 @@ func main() {

config := common.CreateKubeConfigOrDie(commonFlags.KubeConfig, float32(commonFlags.KubeApiQps), int(commonFlags.KubeApiBurst))

vpaClient := vpa_clientset.NewForConfigOrDie(config)
vpaLister := vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), commonFlags.VpaObjectNamespace)
kubeClient := kube_client.NewForConfigOrDie(config)
factory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this factory should be passed the WithNamespace(commonFlags.VpaObjectNamespace) option

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I think it does make sense to pass the namespace in to the informer, however, I'm nervous for a change like this without e2e tests.
So I think I'll first make e2e tests to test the namespace command line args before I proceed with this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good


vpaClient := vpa_clientset.NewForConfigOrDie(config)
vpaFactory := vpa_informers.NewSharedInformerFactoryWithOptions(vpaClient, 1*time.Hour, vpa_informers.WithNamespace(commonFlags.VpaObjectNamespace))

targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, factory)
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
podPreprocessor := pod.NewDefaultPreProcessor()
Expand All @@ -114,18 +117,27 @@ func main() {
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
}
recommendationProvider := recommendation.NewProvider(limitRangeCalculator, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator))
vpaLister := vpa_api_util.NewVpasListerFromFactory(vpaFactory)
vpaMatcher := vpa.NewMatcher(vpaLister, targetSelectorFetcher, controllerFetcher)

stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
vpaFactory.Start(stopCh)
informerMap := factory.WaitForCacheSync(stopCh)
for kind, synced := range informerMap {
if !synced {
klog.ErrorS(nil, fmt.Sprintf("Could not sync cache for the %s informer", kind.String()))
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
vpaInformerMap := vpaFactory.WaitForCacheSync(stopCh)
for kind, synced := range vpaInformerMap {
if !synced {
klog.ErrorS(nil, fmt.Sprintf("Could not sync VPA cache for the %s informer", kind.String()))
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

hostname, err := os.Hostname()
if err != nil {
Expand Down
23 changes: 20 additions & 3 deletions vertical-pod-autoscaler/pkg/recommender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

"k8s.io/autoscaler/vertical-pod-autoscaler/common"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
vpa_informers "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/checkpoint"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input"
Expand Down Expand Up @@ -244,20 +245,36 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
stopCh := make(chan struct{})
defer close(stopCh)
config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst))
kubeClient := kube_client.NewForConfigOrDie(config)
clusterState := model.NewClusterState(aggregateContainerStateGCInterval)

kubeClient := kube_client.NewForConfigOrDie(config)
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(commonFlag.VpaObjectNamespace))

vpaClient := vpa_clientset.NewForConfigOrDie(config)
vpaFactory := vpa_informers.NewSharedInformerFactoryWithOptions(vpaClient, 1*time.Hour, vpa_informers.WithNamespace(commonFlag.VpaObjectNamespace))

vpaLister := vpa_api_util.NewVpasListerFromFactory(vpaFactory)
vpacheckpointLister := vpa_api_util.NewVpaCheckpointListerFromFactory(vpaFactory)

controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
podLister, oomObserver := input.NewPodListerAndOOMObserver(ctx, kubeClient, commonFlag.VpaObjectNamespace, stopCh)

factory.Start(stopCh)
vpaFactory.Start(stopCh)
informerMap := factory.WaitForCacheSync(stopCh)
for kind, synced := range informerMap {
if !synced {
klog.ErrorS(nil, fmt.Sprintf("Could not sync cache for the %s informer", kind.String()))
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
vpaInformerMap := vpaFactory.WaitForCacheSync(stopCh)
for kind, synced := range vpaInformerMap {
if !synced {
klog.ErrorS(nil, fmt.Sprintf("Could not sync VPA cache for the %s informer", kind.String()))
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

model.InitializeAggregationsConfig(model.NewAggregationsConfig(*memoryAggregationInterval, *memoryAggregationIntervalCount, *memoryHistogramDecayHalfLife, *cpuHistogramDecayHalfLife, *oomBumpUpRatio, *oomMinBumpUp))

Expand Down Expand Up @@ -296,8 +313,8 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
KubeClient: kubeClient,
MetricsClient: input_metrics.NewMetricsClient(source, commonFlag.VpaObjectNamespace, "default-metrics-client"),
VpaCheckpointClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(),
VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
VpaLister: vpaLister,
VpaCheckpointLister: vpacheckpointLister,
ClusterState: clusterState,
SelectorFetcher: target.NewVpaTargetSelectorFetcher(config, kubeClient, factory),
MemorySaveMode: *memorySaver,
Expand Down
12 changes: 6 additions & 6 deletions vertical-pod-autoscaler/pkg/updater/logic/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package logic

import (
"context"
"fmt"
"slices"
"time"

"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
corescheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -88,6 +88,8 @@ type updater struct {
func NewUpdater(
kubeClient kube_client.Interface,
vpaClient *vpa_clientset.Clientset,
kubeInformerFactory informers.SharedInformerFactory,
vpaLister vpa_lister.VerticalPodAutoscalerLister,
minReplicasForEviction int,
evictionRateLimit float64,
evictionRateBurst int,
Expand All @@ -106,18 +108,16 @@ func NewUpdater(
evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
// TODO: Create in-place rate limits for the in-place rate limiter
inPlaceRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst)
factory, err := restriction.NewPodsRestrictionFactory(
factory := restriction.NewPodsRestrictionFactory(
kubeClient,
kubeInformerFactory,
minReplicasForEviction,
evictionToleranceFraction,
patchCalculators,
)
if err != nil {
return nil, fmt.Errorf("failed to create restriction factory: %v", err)
}

return &updater{
vpaLister: vpa_api_util.NewVpasLister(vpaClient, make(chan struct{}), namespace),
vpaLister: vpaLister,
podLister: newPodLister(kubeClient, namespace),
eventRecorder: newEventRecorder(kubeClient),
restrictionFactory: factory,
Expand Down
34 changes: 25 additions & 9 deletions vertical-pod-autoscaler/pkg/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/recommendation"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
vpa_informers "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
Expand Down Expand Up @@ -178,21 +179,35 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {
config := common.CreateKubeConfigOrDie(commonFlag.KubeConfig, float32(commonFlag.KubeApiQps), int(commonFlag.KubeApiBurst))
kubeClient := kube_client.NewForConfigOrDie(config)
vpaClient := vpa_clientset.NewForConfigOrDie(config)
factory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod)
targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, factory)
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)

kubeFactory := informers.NewSharedInformerFactory(kubeClient, defaultResyncPeriod)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this factory should be passed the WithNamespace(commonFlags.VpaObjectNamespace) option

vpaFactory := vpa_informers.NewSharedInformerFactoryWithOptions(vpaClient, 1*time.Hour, vpa_informers.WithNamespace(commonFlag.VpaObjectNamespace))
vpaLister := vpa_api_util.NewVpasListerFromFactory(vpaFactory)

targetSelectorFetcher := target.NewVpaTargetSelectorFetcher(config, kubeClient, kubeFactory)
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, kubeFactory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
var limitRangeCalculator limitrange.LimitRangeCalculator
limitRangeCalculator, err := limitrange.NewLimitsRangeCalculator(factory)
limitRangeCalculator, err := limitrange.NewLimitsRangeCalculator(kubeFactory)
if err != nil {
klog.ErrorS(err, "Failed to create limitRangeCalculator, falling back to not checking limits")
limitRangeCalculator = limitrange.NewNoopLimitsCalculator()
}

factory.Start(stopCh)
informerMap := factory.WaitForCacheSync(stopCh)
for kind, synced := range informerMap {
kubeFactory.Start(stopCh)
vpaFactory.Start(stopCh)

kubeInformerMap := kubeFactory.WaitForCacheSync(stopCh)
for kind, synced := range kubeInformerMap {
if !synced {
klog.ErrorS(nil, fmt.Sprintf("Could not sync Kubernetes cache for the %s informer", kind.String()))
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

vpaInformerMap := vpaFactory.WaitForCacheSync(stopCh)
for kind, synced := range vpaInformerMap {
if !synced {
klog.ErrorS(nil, fmt.Sprintf("Could not sync cache for the %s informer", kind.String()))
klog.ErrorS(nil, fmt.Sprintf("Could not sync VPA cache for the %s informer", kind.String()))
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
Expand All @@ -208,10 +223,11 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) {

calculators := []patch.Calculator{inplace.NewResourceInPlaceUpdatesCalculator(recommendationProvider), inplace.NewInPlaceUpdatedCalculator()}

// TODO: use SharedInformerFactory in updater
updater, err := updater.NewUpdater(
kubeClient,
vpaClient,
kubeFactory,
vpaLister,
*minReplicas,
*evictionRateLimit,
*evictionRateBurst,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,15 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
appsinformer "k8s.io/client-go/informers/apps/v1"
coreinformer "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
)

const (
resyncPeriod time.Duration = 1 * time.Minute
)

// ControllerKind is the type of controller that can manage a pod.
type controllerKind string

Expand Down Expand Up @@ -65,10 +59,7 @@ type PodsRestrictionFactory interface {
// PodsRestrictionFactoryImpl is the implementation of the PodsRestrictionFactory interface.
type PodsRestrictionFactoryImpl struct {
client kube_client.Interface
rcInformer cache.SharedIndexInformer // informer for Replication Controllers
ssInformer cache.SharedIndexInformer // informer for Stateful Sets
rsInformer cache.SharedIndexInformer // informer for Replica Sets
dsInformer cache.SharedIndexInformer // informer for Daemon Sets
informerFactory informers.SharedInformerFactory
minReplicas int
evictionToleranceFraction float64
clock clock.Clock
Expand All @@ -77,41 +68,22 @@ type PodsRestrictionFactoryImpl struct {
}

// NewPodsRestrictionFactory creates a new PodsRestrictionFactory.
func NewPodsRestrictionFactory(client kube_client.Interface, minReplicas int, evictionToleranceFraction float64, patchCalculators []patch.Calculator) (PodsRestrictionFactory, error) {
rcInformer, err := setupInformer(client, replicationController)
if err != nil {
return nil, fmt.Errorf("failed to create rcInformer: %v", err)
}
ssInformer, err := setupInformer(client, statefulSet)
if err != nil {
return nil, fmt.Errorf("failed to create ssInformer: %v", err)
}
rsInformer, err := setupInformer(client, replicaSet)
if err != nil {
return nil, fmt.Errorf("failed to create rsInformer: %v", err)
}
dsInformer, err := setupInformer(client, daemonSet)
if err != nil {
return nil, fmt.Errorf("failed to create dsInformer: %v", err)
}
func NewPodsRestrictionFactory(client kube_client.Interface, informerFactory informers.SharedInformerFactory, minReplicas int, evictionToleranceFraction float64, patchCalculators []patch.Calculator) PodsRestrictionFactory {
return &PodsRestrictionFactoryImpl{
client: client,
rcInformer: rcInformer, // informer for Replication Controllers
ssInformer: ssInformer, // informer for Stateful Sets
rsInformer: rsInformer, // informer for Replica Sets
dsInformer: dsInformer, // informer for Daemon Sets
informerFactory: informerFactory,
minReplicas: minReplicas,
evictionToleranceFraction: evictionToleranceFraction,
clock: &clock.RealClock{},
lastInPlaceAttemptTimeMap: make(map[string]time.Time),
patchCalculators: patchCalculators,
}, nil
}
}

func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator) (int, error) {
switch creator.Kind {
case replicationController:
rcObj, exists, err := f.rcInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
rcObj, exists, err := f.informerFactory.Core().V1().ReplicationControllers().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("replication controller %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
Expand All @@ -127,7 +99,7 @@ func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator)
}
return int(*rc.Spec.Replicas), nil
case replicaSet:
rsObj, exists, err := f.rsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
rsObj, exists, err := f.informerFactory.Apps().V1().ReplicaSets().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("replica set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
Expand All @@ -143,7 +115,7 @@ func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator)
}
return int(*rs.Spec.Replicas), nil
case statefulSet:
ssObj, exists, err := f.ssInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
ssObj, exists, err := f.informerFactory.Apps().V1().StatefulSets().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("stateful set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
Expand All @@ -159,7 +131,7 @@ func (f *PodsRestrictionFactoryImpl) getReplicaCount(creator podReplicaCreator)
}
return int(*ss.Spec.Replicas), nil
case daemonSet:
dsObj, exists, err := f.dsInformer.GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
dsObj, exists, err := f.informerFactory.Apps().V1().DaemonSets().Informer().GetStore().GetByKey(creator.Namespace + "/" + creator.Name)
if err != nil {
return 0, fmt.Errorf("daemon set %s/%s is not available, err: %v", creator.Namespace, creator.Name, err)
}
Expand Down Expand Up @@ -299,33 +271,6 @@ func managingControllerRef(pod *apiv1.Pod) *metav1.OwnerReference {
return &managingController
}

func setupInformer(kubeClient kube_client.Interface, kind controllerKind) (cache.SharedIndexInformer, error) {
var informer cache.SharedIndexInformer
switch kind {
case replicationController:
informer = coreinformer.NewReplicationControllerInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
case replicaSet:
informer = appsinformer.NewReplicaSetInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
case statefulSet:
informer = appsinformer.NewStatefulSetInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
case daemonSet:
informer = appsinformer.NewDaemonSetInformer(kubeClient, apiv1.NamespaceAll,
resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
default:
return nil, fmt.Errorf("unknown controller kind: %v", kind)
}
stopCh := make(chan struct{})
go informer.Run(stopCh)
synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
if !synced {
return nil, fmt.Errorf("failed to sync %v cache", kind)
}
return informer, nil
}

type singleGroupStats struct {
configured int
pending int
Expand Down
Loading