diff --git a/charts/flagger/README.md b/charts/flagger/README.md index c10fac924..90e4a8b29 100644 --- a/charts/flagger/README.md +++ b/charts/flagger/README.md @@ -12,20 +12,20 @@ and part of [Flux](https://fluxcd.io) family of GitOps tools. ## Prerequisites -* Kubernetes >= 1.19 +- Kubernetes >= 1.19 ## Installing the Chart Add Flagger Helm repository: ```console -$ helm repo add flagger https://flagger.app +helm repo add flagger https://flagger.app ``` Install Flagger's custom resource definitions: ```console -$ kubectl apply -f https://raw.githubusercontent.com/fluxcd/flagger/main/artifacts/flagger/crd.yaml +kubectl apply -f https://raw.githubusercontent.com/fluxcd/flagger/main/artifacts/flagger/crd.yaml ``` To install Flagger for **Istio**: @@ -58,7 +58,6 @@ $ helm upgrade -i flagger flagger/flagger \ --set metricsServer=http://appmesh-prometheus:9090 ``` - To install Flagger for **Open Service Mesh** (requires OSM to have been installed with Prometheus): ```console @@ -120,7 +119,7 @@ If you need to add labels to the flagger deployment or pods, you can pass the la helm upgrade -i flagger flagger/flagger \ \ --set podLabels.= \ ---set deploymentLabels.= +--set deploymentLabels.= ``` The [configuration](#configuration) section lists the parameters that can be configured during installation. @@ -130,7 +129,7 @@ The [configuration](#configuration) section lists the parameters that can be con To uninstall/delete the `flagger` deployment: ```console -$ helm delete flagger +helm delete flagger ``` The command removes all the Kubernetes components associated with the chart and deletes the release. @@ -140,7 +139,7 @@ The command removes all the Kubernetes components associated with the chart and The following tables lists the configurable parameters of the Flagger chart and their default values. | Parameter | Description | Default | -|--------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| ------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------- | | `image.repository` | Image repository | `ghcr.io/fluxcd/flagger` | | `image.tag` | Image tag | `` | | `image.pullPolicy` | Image pull policy | `IfNotPresent` | @@ -173,7 +172,7 @@ The following tables lists the configurable parameters of the Flagger chart and | `serviceAccount.name` | The name of the service account to create or use. If not set and `serviceAccount.create` is `true`, a name is generated using the Flagger fullname | `""` | | `serviceAccount.annotations` | Annotations for service account | `{}` | | `ingressAnnotationsPrefix` | Annotations prefix for ingresses | `custom.ingress.kubernetes.io` | -| `includeLabelPrefix` | List of prefixes of labels that are copied when creating primary deployments or daemonsets. Use * to include all | `""` | +| `includeLabelPrefix` | List of prefixes of labels that are copied when creating primary deployments or daemonsets. Use \* to include all | `""` | | `rbac.create` | If `true`, create and use RBAC resources | `true` | | `rbac.pspEnabled` | If `true`, create and use a restricted pod security policy | `false` | | `crd.create` | If `true`, create Flagger's CRDs (should be enabled for Helm v2 only) | `false` | @@ -197,6 +196,7 @@ The following tables lists the configurable parameters of the Flagger chart and | `namespace` | When specified, Flagger will restrict itself to watching Canary objects from that namespace | `""` | | `deploymentLabels` | Labels to add to Flagger deployment | `{}` | | `podLabels` | Labels to add to pods of Flagger deployment | `{}` | +| `maxConcurrentCanaries` | Limits how many canaries can process in parallel. No limit if "0" | `0` | Specify each parameter using the `--set key=value[,key=value]` argument to `helm upgrade`. For example, diff --git a/charts/flagger/templates/deployment.yaml b/charts/flagger/templates/deployment.yaml index ee7d22b50..b6b266090 100644 --- a/charts/flagger/templates/deployment.yaml +++ b/charts/flagger/templates/deployment.yaml @@ -153,6 +153,9 @@ spec: {{- if .Values.noCrossNamespaceRefs }} - -no-cross-namespace-refs={{ .Values.noCrossNamespaceRefs }} {{- end }} + {{- if .Values.maxConcurrentCanaries }} + - -max-concurrent-canaries={{ .Values.maxConcurrentCanaries }} + {{- end }} livenessProbe: exec: command: diff --git a/charts/flagger/values.yaml b/charts/flagger/values.yaml index a619c0026..c63a6fd4c 100644 --- a/charts/flagger/values.yaml +++ b/charts/flagger/values.yaml @@ -203,6 +203,8 @@ deploymentLabels: { } noCrossNamespaceRefs: false +maxConcurrentCanaries: 0 + #Placeholder to supply additional volumes to the flagger pod additionalVolumes: {} # - name: tmpfs diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index 6438187fd..5dd479cb9 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -86,6 +86,7 @@ var ( kubeconfigServiceMesh string clusterName string noCrossNamespaceRefs bool + maxConcurrentCanaries int ) func init() { @@ -121,6 +122,7 @@ func init() { flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.") flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.") flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.") + flag.IntVar(&maxConcurrentCanaries, "max-concurrent-canaries", 0, "Limit parallel processing canaries. Unlimited if set to 0, which is default") } func main() { @@ -253,6 +255,7 @@ func main() { fromEnv("EVENT_WEBHOOK_URL", eventWebhook), clusterName, noCrossNamespaceRefs, + maxConcurrentCanaries, cfg, ) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index cabed052a..cbfff562e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -51,26 +51,28 @@ const controllerAgentName = "flagger" // Controller is managing the canary objects and schedules canary deployments type Controller struct { - kubeConfig *rest.Config - kubeClient kubernetes.Interface - flaggerClient clientset.Interface - flaggerInformers Informers - flaggerSynced cache.InformerSynced - flaggerWindow time.Duration - workqueue workqueue.RateLimitingInterface - eventRecorder record.EventRecorder - logger *zap.SugaredLogger - canaries *sync.Map - jobs map[string]CanaryJob - recorder metrics.Recorder - notifier notifier.Interface - canaryFactory *canary.Factory - routerFactory *router.Factory - observerFactory *observers.Factory - meshProvider string - eventWebhook string - clusterName string - noCrossNamespaceRefs bool + kubeConfig *rest.Config + kubeClient kubernetes.Interface + flaggerClient clientset.Interface + flaggerInformers Informers + flaggerSynced cache.InformerSynced + flaggerWindow time.Duration + workqueue workqueue.RateLimitingInterface + eventRecorder record.EventRecorder + logger *zap.SugaredLogger + canaries *sync.Map + jobs map[string]CanaryJob + recorder metrics.Recorder + notifier notifier.Interface + canaryFactory *canary.Factory + routerFactory *router.Factory + observerFactory *observers.Factory + meshProvider string + eventWebhook string + clusterName string + noCrossNamespaceRefs bool + pendingCanaries map[string]bool + maxConcurrentCanaries int } type Informers struct { @@ -94,6 +96,7 @@ func NewController( eventWebhook string, clusterName string, noCrossNamespaceRefs bool, + maxConcurrentCanaries int, kubeConfig *rest.Config, ) *Controller { logger.Debug("Creating event broadcaster") @@ -109,26 +112,28 @@ func NewController( recorder.SetInfo(version, meshProvider) ctrl := &Controller{ - kubeConfig: kubeConfig, - kubeClient: kubeClient, - flaggerClient: flaggerClient, - flaggerInformers: flaggerInformers, - flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: eventRecorder, - logger: logger, - canaries: new(sync.Map), - jobs: map[string]CanaryJob{}, - flaggerWindow: flaggerWindow, - observerFactory: observerFactory, - recorder: recorder, - notifier: notifier, - canaryFactory: canaryFactory, - routerFactory: routerFactory, - meshProvider: meshProvider, - eventWebhook: eventWebhook, - clusterName: clusterName, - noCrossNamespaceRefs: noCrossNamespaceRefs, + kubeConfig: kubeConfig, + kubeClient: kubeClient, + flaggerClient: flaggerClient, + flaggerInformers: flaggerInformers, + flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), + eventRecorder: eventRecorder, + logger: logger, + canaries: new(sync.Map), + jobs: map[string]CanaryJob{}, + flaggerWindow: flaggerWindow, + observerFactory: observerFactory, + recorder: recorder, + notifier: notifier, + canaryFactory: canaryFactory, + routerFactory: routerFactory, + meshProvider: meshProvider, + eventWebhook: eventWebhook, + clusterName: clusterName, + noCrossNamespaceRefs: noCrossNamespaceRefs, + pendingCanaries: map[string]bool{}, + maxConcurrentCanaries: maxConcurrentCanaries, } flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -237,7 +242,6 @@ func (c *Controller) processNextWorkItem() bool { c.workqueue.Forget(obj) return nil }(obj) - if err != nil { utilruntime.HandleError(err) return true @@ -307,7 +311,6 @@ func (c *Controller) syncHandler(key string) error { if err := c.addFinalizer(cd); err != nil { return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err) } - } c.logger.Infof("Synced %s", key) diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 6a9a2664a..e4bef05e9 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -134,6 +134,7 @@ func (c *Controller) scheduleCanaries() { for job := range c.jobs { if _, exists := current[job]; !exists { c.jobs[job].Stop() + delete(c.pendingCanaries, job) delete(c.jobs, job) } } @@ -283,11 +284,22 @@ func (c *Controller) advanceCanary(name string, namespace string) { return } + key := fmt.Sprintf("%s.%s", cd.Name, cd.Namespace) + if !shouldAdvance { + delete(c.pendingCanaries, key) c.recorder.SetStatus(cd, cd.Status.Phase) return } + if _, exists := c.pendingCanaries[key]; c.maxConcurrentCanaries > 0 && len(c.pendingCanaries) >= c.maxConcurrentCanaries && !exists { + canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseWaiting) + c.recordEventInfof(cd, "waiting with canary %v.%v %v to process, because maximum of concurrent canaries reached", cd.Name, cd.Namespace, cd.UID) + return + } + + c.pendingCanaries[key] = true + maxWeight := c.maxWeight(cd) // check primary status @@ -485,7 +497,6 @@ func (c *Controller) advanceCanary(name string, namespace string) { } c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight) } - } func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller, @@ -542,7 +553,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo } return - } func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, @@ -729,7 +739,6 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can return } } - } func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool { @@ -853,7 +862,6 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca } return newCfg, nil - } func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool { @@ -1010,7 +1018,6 @@ func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error { firstTry = false return }) - if err != nil { return fmt.Errorf("failed after retries: %w", err) } diff --git a/pkg/controller/scheduler_daemonset_fixture_test.go b/pkg/controller/scheduler_daemonset_fixture_test.go index af7e39655..f896f1227 100644 --- a/pkg/controller/scheduler_daemonset_fixture_test.go +++ b/pkg/controller/scheduler_daemonset_fixture_test.go @@ -120,6 +120,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture { recorder: metrics.NewRecorder(controllerAgentName, false), routerFactory: rf, notifier: ¬ifier.NopNotifier{}, + pendingCanaries: map[string]bool{}, } ctrl.flaggerSynced = alwaysReady ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c) diff --git a/pkg/controller/scheduler_deployment_fixture_test.go b/pkg/controller/scheduler_deployment_fixture_test.go index 57357567b..dbb5d6fa5 100644 --- a/pkg/controller/scheduler_deployment_fixture_test.go +++ b/pkg/controller/scheduler_deployment_fixture_test.go @@ -149,6 +149,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture { recorder: metrics.NewRecorder(controllerAgentName, false), routerFactory: rf, notifier: ¬ifier.NopNotifier{}, + pendingCanaries: map[string]bool{}, } ctrl.flaggerSynced = alwaysReady ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c) diff --git a/pkg/controller/scheduler_deployment_test.go b/pkg/controller/scheduler_deployment_test.go index 51161ec44..59a644839 100644 --- a/pkg/controller/scheduler_deployment_test.go +++ b/pkg/controller/scheduler_deployment_test.go @@ -394,6 +394,83 @@ func TestScheduler_DeploymentPromotion(t *testing.T) { assert.Equal(t, flaggerv1.CanaryPhaseSucceeded, c.Status.Phase) } +func TestScheduler_DeploymentMaxConcurrent(t *testing.T) { + mocks := newDeploymentFixture(nil) + + secondCanary := newDeploymentTestCanary() + secondCanary.Name = "podinfo2" + + mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Create(context.TODO(), secondCanary, metav1.CreateOptions{}) + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary) + + // initializing + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // make primary ready + mocks.makePrimaryReady(t) + + // initialized + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // update + dep2 := newDeploymentTestDeploymentV2() + _, err := mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{}) + require.NoError(t, err) + + // detect pod spec changes + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // if no maxConcurrentCanaries is set, all canaries should proceed + c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) + + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) + + // delete second canary and set maxConcurrency. Then add it again + delete(mocks.ctrl.pendingCanaries, "podinfo2.default") + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Delete(secondCanary) + mocks.ctrl.maxConcurrentCanaries = 1 + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary) + + mocks.ctrl.advanceCanary("podinfo2", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + _, err = mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{}) + require.NoError(t, err) + + // check if second canary is waiting now + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + mocks.ctrl.advanceCanary("podinfo2", "default") + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseWaiting, c.Status.Phase) + + // make first deployment succeeded + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + + // after succeeded it should get removed from pendingCanaries + mocks.ctrl.advanceCanary("podinfo", "default") + + // second canary should start with next call + mocks.ctrl.advanceCanary("podinfo2", "default") + + // check if second canary is starting + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) +} + func TestScheduler_DeploymentMirroring(t *testing.T) { mocks := newDeploymentFixture(newDeploymentTestCanaryMirror())