Skip to content

Commit

Permalink
add maxConcurrentCanaries flag to limit concurrent progressing canaries
Browse files Browse the repository at this point in the history
This adds a flag to limit concurrent progessing canaries to avoid high
requests of resources at once.

The flag will not take effect if set to "0", which is default.

Closes #1069

Signed-off-by: Louis Halbritter <[email protected]>

chore: update Helm default values and README

Signed-off-by: Louis Halbritter <[email protected]>
  • Loading branch information
louishalbritter committed Feb 7, 2025
1 parent 40e2802 commit 1571c75
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 55 deletions.
16 changes: 8 additions & 8 deletions charts/flagger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ and part of [Flux](https://fluxcd.io) family of GitOps tools.

## Prerequisites

* Kubernetes >= 1.19
- Kubernetes >= 1.19

## Installing the Chart

Add Flagger Helm repository:

```console
$ helm repo add flagger https://flagger.app
helm repo add flagger https://flagger.app
```

Install Flagger's custom resource definitions:

```console
$ kubectl apply -f https://raw.githubusercontent.com/fluxcd/flagger/main/artifacts/flagger/crd.yaml
kubectl apply -f https://raw.githubusercontent.com/fluxcd/flagger/main/artifacts/flagger/crd.yaml
```

To install Flagger for **Istio**:
Expand Down Expand Up @@ -58,7 +58,6 @@ $ helm upgrade -i flagger flagger/flagger \
--set metricsServer=http://appmesh-prometheus:9090
```


To install Flagger for **Open Service Mesh** (requires OSM to have been installed with Prometheus):

```console
Expand Down Expand Up @@ -120,7 +119,7 @@ If you need to add labels to the flagger deployment or pods, you can pass the la
helm upgrade -i flagger flagger/flagger \
<other parameters> \
--set podLabels.<labelName>=<labelValue> \
--set deploymentLabels.<labelName>=<labelValue>
--set deploymentLabels.<labelName>=<labelValue>
```

The [configuration](#configuration) section lists the parameters that can be configured during installation.
Expand All @@ -130,7 +129,7 @@ The [configuration](#configuration) section lists the parameters that can be con
To uninstall/delete the `flagger` deployment:

```console
$ helm delete flagger
helm delete flagger
```

The command removes all the Kubernetes components associated with the chart and deletes the release.
Expand All @@ -140,7 +139,7 @@ The command removes all the Kubernetes components associated with the chart and
The following tables lists the configurable parameters of the Flagger chart and their default values.

| Parameter | Description | Default |
|--------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------|
| ------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------- |
| `image.repository` | Image repository | `ghcr.io/fluxcd/flagger` |
| `image.tag` | Image tag | `<VERSION>` |
| `image.pullPolicy` | Image pull policy | `IfNotPresent` |
Expand Down Expand Up @@ -173,7 +172,7 @@ The following tables lists the configurable parameters of the Flagger chart and
| `serviceAccount.name` | The name of the service account to create or use. If not set and `serviceAccount.create` is `true`, a name is generated using the Flagger fullname | `""` |
| `serviceAccount.annotations` | Annotations for service account | `{}` |
| `ingressAnnotationsPrefix` | Annotations prefix for ingresses | `custom.ingress.kubernetes.io` |
| `includeLabelPrefix` | List of prefixes of labels that are copied when creating primary deployments or daemonsets. Use * to include all | `""` |
| `includeLabelPrefix` | List of prefixes of labels that are copied when creating primary deployments or daemonsets. Use \* to include all | `""` |
| `rbac.create` | If `true`, create and use RBAC resources | `true` |
| `rbac.pspEnabled` | If `true`, create and use a restricted pod security policy | `false` |
| `crd.create` | If `true`, create Flagger's CRDs (should be enabled for Helm v2 only) | `false` |
Expand All @@ -197,6 +196,7 @@ The following tables lists the configurable parameters of the Flagger chart and
| `namespace` | When specified, Flagger will restrict itself to watching Canary objects from that namespace | `""` |
| `deploymentLabels` | Labels to add to Flagger deployment | `{}` |
| `podLabels` | Labels to add to pods of Flagger deployment | `{}` |
| `maxConcurrentCanaries` | Limits how many canaries can process in parallel. No limit if "0" | `0` |

Specify each parameter using the `--set key=value[,key=value]` argument to `helm upgrade`. For example,

Expand Down
3 changes: 3 additions & 0 deletions charts/flagger/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ spec:
{{- if .Values.noCrossNamespaceRefs }}
- -no-cross-namespace-refs={{ .Values.noCrossNamespaceRefs }}
{{- end }}
{{- if .Values.maxConcurrentCanaries }}
- -max-concurrent-canaries={{ .Values.maxConcurrentCanaries }}
{{- end }}
livenessProbe:
exec:
command:
Expand Down
2 changes: 2 additions & 0 deletions charts/flagger/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ deploymentLabels: { }

noCrossNamespaceRefs: false

maxConcurrentCanaries: 0

#Placeholder to supply additional volumes to the flagger pod
additionalVolumes: {}
# - name: tmpfs
Expand Down
3 changes: 3 additions & 0 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var (
kubeconfigServiceMesh string
clusterName string
noCrossNamespaceRefs bool
maxConcurrentCanaries int
)

func init() {
Expand Down Expand Up @@ -121,6 +122,7 @@ func init() {
flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.")
flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.")
flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.")
flag.IntVar(&maxConcurrentCanaries, "max-concurrent-canaries", 0, "Limit parallel processing canaries. Unlimited if set to 0, which is default")
}

func main() {
Expand Down Expand Up @@ -253,6 +255,7 @@ func main() {
fromEnv("EVENT_WEBHOOK_URL", eventWebhook),
clusterName,
noCrossNamespaceRefs,
maxConcurrentCanaries,
cfg,
)

Expand Down
87 changes: 45 additions & 42 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,28 @@ const controllerAgentName = "flagger"

// Controller is managing the canary objects and schedules canary deployments
type Controller struct {
kubeConfig *rest.Config
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
flaggerInformers Informers
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
logger *zap.SugaredLogger
canaries *sync.Map
jobs map[string]CanaryJob
recorder metrics.Recorder
notifier notifier.Interface
canaryFactory *canary.Factory
routerFactory *router.Factory
observerFactory *observers.Factory
meshProvider string
eventWebhook string
clusterName string
noCrossNamespaceRefs bool
kubeConfig *rest.Config
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
flaggerInformers Informers
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
logger *zap.SugaredLogger
canaries *sync.Map
jobs map[string]CanaryJob
recorder metrics.Recorder
notifier notifier.Interface
canaryFactory *canary.Factory
routerFactory *router.Factory
observerFactory *observers.Factory
meshProvider string
eventWebhook string
clusterName string
noCrossNamespaceRefs bool
pendingCanaries map[string]bool
maxConcurrentCanaries int
}

type Informers struct {
Expand All @@ -94,6 +96,7 @@ func NewController(
eventWebhook string,
clusterName string,
noCrossNamespaceRefs bool,
maxConcurrentCanaries int,
kubeConfig *rest.Config,
) *Controller {
logger.Debug("Creating event broadcaster")
Expand All @@ -109,26 +112,28 @@ func NewController(
recorder.SetInfo(version, meshProvider)

ctrl := &Controller{
kubeConfig: kubeConfig,
kubeClient: kubeClient,
flaggerClient: flaggerClient,
flaggerInformers: flaggerInformers,
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
eventRecorder: eventRecorder,
logger: logger,
canaries: new(sync.Map),
jobs: map[string]CanaryJob{},
flaggerWindow: flaggerWindow,
observerFactory: observerFactory,
recorder: recorder,
notifier: notifier,
canaryFactory: canaryFactory,
routerFactory: routerFactory,
meshProvider: meshProvider,
eventWebhook: eventWebhook,
clusterName: clusterName,
noCrossNamespaceRefs: noCrossNamespaceRefs,
kubeConfig: kubeConfig,
kubeClient: kubeClient,
flaggerClient: flaggerClient,
flaggerInformers: flaggerInformers,
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
eventRecorder: eventRecorder,
logger: logger,
canaries: new(sync.Map),
jobs: map[string]CanaryJob{},
flaggerWindow: flaggerWindow,
observerFactory: observerFactory,
recorder: recorder,
notifier: notifier,
canaryFactory: canaryFactory,
routerFactory: routerFactory,
meshProvider: meshProvider,
eventWebhook: eventWebhook,
clusterName: clusterName,
noCrossNamespaceRefs: noCrossNamespaceRefs,
pendingCanaries: map[string]bool{},
maxConcurrentCanaries: maxConcurrentCanaries,
}

flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -237,7 +242,6 @@ func (c *Controller) processNextWorkItem() bool {
c.workqueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
Expand Down Expand Up @@ -307,7 +311,6 @@ func (c *Controller) syncHandler(key string) error {
if err := c.addFinalizer(cd); err != nil {
return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err)
}

}
c.logger.Infof("Synced %s", key)

Expand Down
17 changes: 12 additions & 5 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (c *Controller) scheduleCanaries() {
for job := range c.jobs {
if _, exists := current[job]; !exists {
c.jobs[job].Stop()
delete(c.pendingCanaries, job)
delete(c.jobs, job)
}
}
Expand Down Expand Up @@ -283,11 +284,22 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

key := fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)

if !shouldAdvance {
delete(c.pendingCanaries, key)
c.recorder.SetStatus(cd, cd.Status.Phase)
return
}

if _, exists := c.pendingCanaries[key]; c.maxConcurrentCanaries > 0 && len(c.pendingCanaries) >= c.maxConcurrentCanaries && !exists {
canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseWaiting)
c.recordEventInfof(cd, "waiting with canary %v.%v %v to process, because maximum of concurrent canaries reached", cd.Name, cd.Namespace, cd.UID)
return
}

c.pendingCanaries[key] = true

maxWeight := c.maxWeight(cd)

// check primary status
Expand Down Expand Up @@ -485,7 +497,6 @@ func (c *Controller) advanceCanary(name string, namespace string) {
}
c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)
}

}

func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller,
Expand Down Expand Up @@ -542,7 +553,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo
}

return

}

func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,
Expand Down Expand Up @@ -729,7 +739,6 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can
return
}
}

}

func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool {
Expand Down Expand Up @@ -853,7 +862,6 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca
}

return newCfg, nil

}

func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool {
Expand Down Expand Up @@ -1010,7 +1018,6 @@ func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error {
firstTry = false
return
})

if err != nil {
return fmt.Errorf("failed after retries: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scheduler_daemonset_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
recorder: metrics.NewRecorder(controllerAgentName, false),
routerFactory: rf,
notifier: &notifier.NopNotifier{},
pendingCanaries: map[string]bool{},
}
ctrl.flaggerSynced = alwaysReady
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scheduler_deployment_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
recorder: metrics.NewRecorder(controllerAgentName, false),
routerFactory: rf,
notifier: &notifier.NopNotifier{},
pendingCanaries: map[string]bool{},
}
ctrl.flaggerSynced = alwaysReady
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)
Expand Down
Loading

0 comments on commit 1571c75

Please sign in to comment.