Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions pkg/k8s/client/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ var ClientBuilderCell = cell.Module(
)

var (
k8sHeartbeatControllerGroup = controller.NewGroup("k8s-heartbeat")
connTimeout = time.Minute
connRetryInterval = 5 * time.Second
k8sHeartbeatControllerGroup = controller.NewGroup("k8s-heartbeat")
k8sConnRecoveryControllerGroup = controller.NewGroup("k8s-conn-recovery")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This var block is not gofmt-aligned — gofmt -l flags the file. Run gofmt -w (the = columns need re-aligning after adding the longer name), otherwise the make checkpatch/gofmt CI gate will fail.

connTimeout = time.Minute
connRetryInterval = 5 * time.Second
)

// Type aliases for the clientsets to avoid name collision on 'Clientset' when composing them.
Expand Down Expand Up @@ -243,7 +244,14 @@ func (c *compositeClientset) onStart(startCtx cell.HookContext) error {
}

if err := c.waitForConn(startCtx); err != nil {
return err
if !c.config.IgnoreApiserverFailOnStart {
return err
}
c.logger.Warn("Unable to connect to k8s API server on startup; continuing in degraded state",
logfields.Error, err,
)
c.startConnRecovery()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This degraded-start feature doesn't extend to the operator, which embeds the same client.Cell and flag. When this path returns nil with the version unset, runOperator (operator/cmd/root.go:525-530) sees MinimalVersionMet == false and calls logging.Fatal, so the operator exits during the outage instead of running degraded. Not a regression (the operator already fails to start when the apiserver is unreachable), but if degraded-start is meant to cover the operator, that version gate needs the same relaxation.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good to know but it does not impact operator.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ya. I didn't even think of the operator

return nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the normal path k8sversion.Update() (l.259) populates the global version + capabilities before onStart returns. The degraded path returns before that runs, so the agent operates with default/empty capabilities until recovery succeeds. Is downstream code that reads k8sversion.Capabilities() safe with unset values during the degraded window?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this I should dig into more. I wonder what the default capabilities are? what would be missing without the version check succeeding. I'll check it out

}
c.startHeartbeat()

Expand Down Expand Up @@ -309,6 +317,46 @@ func (c *compositeClientset) startHeartbeat() {
})
}

// degraded state background retry
func (c *compositeClientset) startConnRecovery() {
const controllerName = "k8s-conn-recovery"
c.controller.UpdateController(controllerName,
controller.ControllerParams{
Group: k8sConnRecoveryControllerGroup,
// use the same cfg vars as onstart for timeout and retry
// allow the controller to exec the anon at interval
DoFunc: func(ctx context.Context) error {
if err := isConnReady(c); err != nil {
c.logger.Debug("k8s API server still unreachable, will retry",
logfields.IPAddr, c.restConfigManager.getConfig().Host,
logfields.Error, err,
)
return nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning nil while the apiserver is still unreachable makes the controller record a success every interval (growing successCount, lastError=nil), so cilium status --all-controllers and the controller metrics show zero failures and operators can't see the node is stuck degraded. Prefer return err here (and optionally attach a Health reporter) so status/metrics reflect the degraded state — note this also engages the error-backoff cadence, so pair it with MaxRetryInterval if you want to keep a steady retry rate.

}

c.logger.Info("Re-established connection to API server. Exiting degraded state",
logfields.IPAddr, c.restConfigManager.getConfig().Host,
)
// start the heartbeat as this was previously skipped
c.startHeartbeat()

// do the k8s version check. might remove
if err := k8sversion.Update(c.logger, c, c.config.EnableK8sAPIDiscovery); err != nil {
c.logger.Warn("k8s version check failed after reconnect", logfields.Error, err)
} else if !k8sversion.Capabilities().MinimalVersionMet {
c.logger.Warn("k8s version does not meet minimal standardc",
"version", k8sversion.Version(),
"minVersion", k8sversion.MinimalVersionConstraint,
)
}

c.controller.RemoveController(controllerName)
return nil
Comment on lines +337 to +354

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveController runs unconditionally here, even when k8sversion.Update() only logged a Warn. A successful isConnReady (a kube-system GET) does not guarantee ServerVersion() succeeds, so the controller can be destroyed with the cached version still zero-valued for the process lifetime — CEP sync (endpointsynchronizer.go:~120) then stays permanently broken while the agent looks healthy.

Reorder so the version gate runs first and the controller is only torn down once the version is confirmed; otherwise return err to keep retrying. This also resolves the fatal-vs-warn asymmetry (degraded now stays degraded and retries instead of silently running on an unsupported version), starts the heartbeat only after the connection is confirmed (avoiding a premature/duplicate heartbeat), drops the leftover // might remove note, and fixes the standardc typo.

Note: return err switches the controller to its error-backoff path (errorRetries * 1s, uncapped). If you want to keep the steady 5s cadence, also set MaxRetryInterval: connRetryInterval (or an ErrorRetryBaseDuration) on the ControllerParams.

Suggested change
c.logger.Info("Re-established connection to API server. Exiting degraded state",
logfields.IPAddr, c.restConfigManager.getConfig().Host,
)
// start the heartbeat as this was previously skipped
c.startHeartbeat()
// do the k8s version check. might remove
if err := k8sversion.Update(c.logger, c, c.config.EnableK8sAPIDiscovery); err != nil {
c.logger.Warn("k8s version check failed after reconnect", logfields.Error, err)
} else if !k8sversion.Capabilities().MinimalVersionMet {
c.logger.Warn("k8s version does not meet minimal standardc",
"version", k8sversion.Version(),
"minVersion", k8sversion.MinimalVersionConstraint,
)
}
c.controller.RemoveController(controllerName)
return nil
// A successful isConnReady (kube-system GET) does not guarantee the
// version discovery call below succeeds. Only exit degraded state once
// the version is confirmed; until then stay degraded and let the
// controller retry rather than tearing it down with an unset version.
if err := k8sversion.Update(c.logger, c, c.config.EnableK8sAPIDiscovery); err != nil {
c.logger.Warn("k8s version check failed after reconnect; staying degraded", logfields.Error, err)
return err
}
if !k8sversion.Capabilities().MinimalVersionMet {
return fmt.Errorf("k8s version (%v) does not meet minimal requirement (%v); staying degraded",
k8sversion.Version(), k8sversion.MinimalVersionConstraint)
}
c.logger.Info("Re-established connection to API server. Exiting degraded state",
logfields.IPAddr, c.restConfigManager.getConfig().Host,
)
// Start the heartbeat (skipped during degraded onStart) now that the
// connection is confirmed usable, then stop retrying.
c.startHeartbeat()
c.controller.RemoveController(controllerName)
return nil

},
RunInterval: connRetryInterval,
})
}

func (c *compositeClientset) waitForConn(ctx context.Context) error {
stop := make(chan struct{})
timeout := time.NewTimer(connTimeout)
Expand Down
7 changes: 7 additions & 0 deletions pkg/k8s/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type SharedConfig struct {

// EnableAPIDiscovery enables Kubernetes API discovery
EnableK8sAPIDiscovery bool

// IgnoreApiserverFailOnStart controls whether a failure to connect to the
// k8s API server during startup is treated as fatal. When true, the agent
// continues starting in a degraded state instead of exiting.
IgnoreApiserverFailOnStart bool
}

type ClientParams struct {
Expand Down Expand Up @@ -75,6 +80,7 @@ var defaultSharedConfig = SharedConfig{
K8sClientConnectionKeepAlive: 30 * time.Second,
K8sHeartbeatTimeout: 30 * time.Second,
EnableK8sAPIDiscovery: defaults.K8sEnableAPIDiscovery,
IgnoreApiserverFailOnStart: false,
}

func (def SharedConfig) Flags(flags *pflag.FlagSet) {
Expand All @@ -87,6 +93,7 @@ func (def SharedConfig) Flags(flags *pflag.FlagSet) {
flags.Duration(option.K8sClientConnectionKeepAlive, def.K8sClientConnectionKeepAlive, "Configures the keep alive duration of K8s client connections. K8 client is disabled if the value is set to 0")
flags.Duration(option.K8sHeartbeatTimeout, def.K8sHeartbeatTimeout, "Configures the timeout for api-server heartbeat, set to 0 to disable")
flags.Bool(option.K8sEnableAPIDiscovery, def.EnableK8sAPIDiscovery, "Enable discovery of Kubernetes API groups and resources with the discovery API")
flags.Bool(option.IgnoreApiserverFailOnStart, def.IgnoreApiserverFailOnStart, "When true, failure to connect to the k8s API server on startup is non-fatal; the agent starts in a degraded state")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New agent flag — regenerate the cmdref docs (make -C Documentation update-cmdref) or the Documentation/cmdref CI check will fail.

}

func NewClientConfig(cfg SharedConfig, params ClientParams) Config {
Expand Down
5 changes: 5 additions & 0 deletions pkg/option/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ const (
// Intended for operating cilium with CNI-compatible orchestrators other than Kubernetes. (default is true)
EnableK8s = "enable-k8s"

// IgnoreApiserverFailOnStart controls whether a failure to connect to the
// k8s API server during startup is treated as fatal. When true, the agent
// continues starting in a degraded state instead of exiting. (default is false)
IgnoreApiserverFailOnStart = "ignore-apiserver-fail-onstart"

// K8sAPIServer is the kubernetes api address server (for https use --k8s-kubeconfig-path instead)
K8sAPIServer = "k8s-api-server"

Expand Down