Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,22 +474,49 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
// context deadline or if the context has been cancelled, the context's
// error will be returned. Otherwise, it succeeded.
if !option.Config.DryMode {
_, err := params.CRDSyncPromise.Await(d.ctx)
if err != nil {
return nil, restoredEndpoints, err
if option.Config.EnableK8sDegradedStart {
// In degraded mode the apiserver may be unreachable. Don't block
// startup on CRD sync: wait only briefly and then continue, so the
// agent can open its API socket and start the datapath/BGP control
// plane. The CRDSync promise keeps trying in the background and
// CRD-driven reflectors converge once the apiserver is reachable.
crdCtx, cancel := context.WithTimeout(d.ctx, degradedStartupGateTimeout)
_, err := params.CRDSyncPromise.Await(crdCtx)
cancel()
if err != nil {
d.logger.Warn("Proceeding without Cilium CRD sync (degraded start); "+
"CRD-driven features activate once the apiserver is reachable",
logfields.Error, err)
}
} else {
_, err := params.CRDSyncPromise.Await(d.ctx)
if err != nil {
return nil, restoredEndpoints, err
}
}
}

if option.Config.IPAM == ipamOption.IPAMClusterPool ||
option.Config.IPAM == ipamOption.IPAMMultiPool {
// Create the CiliumNode custom resource. This call will block until
// the custom resource has been created
// the custom resource has been created. In degraded mode it is bounded
// (~5s of retries) and non-fatal; the resource is (re)created by the
// LocalNodeStore observer once the apiserver is reachable again.
d.nodeDiscovery.UpdateCiliumNodeResource()
}

if err := agentK8s.WaitForNodeInformation(d.ctx, d.logger, params.Resources.LocalNode, params.Resources.LocalCiliumNode); err != nil {
d.logger.Error("unable to connect to get node spec from apiserver", logfields.Error, err)
return nil, nil, fmt.Errorf("unable to connect to get node spec from apiserver: %w", err)
// In degraded mode, missing node information from the apiserver must
// not abort startup; the local node was restored from the on-disk
// snapshot and is reconciled in the background once the apiserver
// becomes reachable.
if option.Config.EnableK8sDegradedStart {
d.logger.Warn("Proceeding without node information from apiserver (degraded start)",
logfields.Error, err)
} else {
d.logger.Error("unable to connect to get node spec from apiserver", logfields.Error, err)
return nil, nil, fmt.Errorf("unable to connect to get node spec from apiserver: %w", err)
}
}

// Kubernetes demands that the localhost can always reach local
Expand Down
3 changes: 3 additions & 0 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,9 @@ func InitGlobalFlags(logger *slog.Logger, cmd *cobra.Command, vp *viper.Viper) {
flags.MarkHidden(option.EnableK8sNetworkPolicy)
option.BindEnv(vp, option.EnableK8sNetworkPolicy)

flags.Bool(option.K8sDegradedStart, false, "Allow the agent to start in a degraded mode when the Kubernetes apiserver is unreachable: the k8s connection/version checks become non-fatal and the local node is restored from an on-disk snapshot. Keeps the datapath and BGP control plane able to restart during an apiserver outage.")
option.BindEnv(vp, option.K8sDegradedStart)

flags.Bool(option.EnableCiliumNetworkPolicy, defaults.EnableCiliumNetworkPolicy, "Enable support for Cilium Network Policy")
flags.MarkHidden(option.EnableCiliumNetworkPolicy)
option.BindEnv(vp, option.EnableCiliumNetworkPolicy)
Expand Down
143 changes: 143 additions & 0 deletions daemon/cmd/local_node_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package cmd

import (
"encoding/json"
"net"
"os"
"path/filepath"

k8stypes "k8s.io/apimachinery/pkg/types"

"github.com/cilium/cilium/pkg/cidr"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/node"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/source"
"github.com/cilium/cilium/pkg/time"
)

// localNodeSnapshotFile is the file (relative to the runtime state directory)
// used to persist the local node so that it can be restored during a degraded
// start when the Kubernetes apiserver is unreachable.
const localNodeSnapshotFile = "local_node.state"

// degradedLocalNodeInitTimeout bounds how long InitLocalNode waits for the
// apiserver before falling back to the on-disk snapshot during a degraded
// start. Healthy clusters return well within this window (the node Upsert
// event arrives in seconds); only a true apiserver outage hits the timeout.
const degradedLocalNodeInitTimeout = 30 * time.Second

// degradedStartupGateTimeout bounds how long the daemon start hook waits on
// apiserver-dependent gates (CRD sync, node information) before continuing in
// degraded mode. It exists so that, during an apiserver outage, the agent can
// finish booting -- open its API socket and start the datapath/BGP control
// plane -- instead of blocking (or crashlooping) until the apiserver returns.
// The background reconcilers keep retrying and converge once it is reachable.
const degradedStartupGateTimeout = 30 * time.Second

// localNodeSnapshot is the on-disk, JSON-serializable representation of the
// local node. The embedded nodeTypes.Node round-trips via JSON (the same
// representation used by the node kvstore), and the remaining LocalNode fields
// that are not part of Node are stored explicitly. The slog.Logger is
// intentionally omitted.
type localNodeSnapshot struct {
Node nodeTypes.Node `json:"node"`
OptOutNodeEncryption bool `json:"optOutNodeEncryption,omitempty"`
UID k8stypes.UID `json:"uid,omitempty"`
ProviderID string `json:"providerID,omitempty"`
IPv4NativeRoutingCIDR string `json:"ipv4NativeRoutingCIDR,omitempty"`
IPv6NativeRoutingCIDR string `json:"ipv6NativeRoutingCIDR,omitempty"`
ServiceLoopbackIPv4 string `json:"serviceLoopbackIPv4,omitempty"`
}

func localNodeSnapshotPath() string {
dir := option.Config.RunDir
if dir == "" {
dir = "/var/run/cilium"
}
return filepath.Join(dir, localNodeSnapshotFile)
}

func toLocalNodeSnapshot(ln node.LocalNode) localNodeSnapshot {
snap := localNodeSnapshot{
Node: ln.Node,
OptOutNodeEncryption: ln.OptOutNodeEncryption,
UID: ln.UID,
ProviderID: ln.ProviderID,
}
if ln.IPv4NativeRoutingCIDR != nil {
snap.IPv4NativeRoutingCIDR = ln.IPv4NativeRoutingCIDR.String()
}
if ln.IPv6NativeRoutingCIDR != nil {
snap.IPv6NativeRoutingCIDR = ln.IPv6NativeRoutingCIDR.String()
}
if ln.ServiceLoopbackIPv4 != nil {
snap.ServiceLoopbackIPv4 = ln.ServiceLoopbackIPv4.String()
}
return snap
}

// saveLocalNodeSnapshot atomically persists the local node to disk. It is
// best-effort and only used when degraded start is enabled.
func saveLocalNodeSnapshot(ln node.LocalNode) error {
data, err := json.Marshal(toLocalNodeSnapshot(ln))
if err != nil {
return err
}
path := localNodeSnapshotPath()
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0o600); err != nil {
return err
}
return os.Rename(tmp, path)
}

// loadLocalNodeSnapshot reads the persisted local node, if any. The boolean
// return is false when no snapshot exists yet.
func loadLocalNodeSnapshot() (localNodeSnapshot, bool, error) {
data, err := os.ReadFile(localNodeSnapshotPath())
if err != nil {
if os.IsNotExist(err) {
return localNodeSnapshot{}, false, nil
}
return localNodeSnapshot{}, false, err
}
var snap localNodeSnapshot
if err := json.Unmarshal(data, &snap); err != nil {
return localNodeSnapshot{}, false, err
}
return snap, true, nil
}

// applyLocalNodeSnapshot overlays a restored snapshot onto the local node.
// The Logger is preserved, and Source/NodeIdentity are reasserted to the
// local-node defaults set by InitLocalNode.
func applyLocalNodeSnapshot(n *node.LocalNode, snap localNodeSnapshot) {
logger := n.Logger
n.Node = snap.Node
n.Logger = logger
n.OptOutNodeEncryption = snap.OptOutNodeEncryption
n.UID = snap.UID
n.ProviderID = snap.ProviderID

if snap.IPv4NativeRoutingCIDR != "" {
if c, err := cidr.ParseCIDR(snap.IPv4NativeRoutingCIDR); err == nil {
n.IPv4NativeRoutingCIDR = c
}
}
if snap.IPv6NativeRoutingCIDR != "" {
if c, err := cidr.ParseCIDR(snap.IPv6NativeRoutingCIDR); err == nil {
n.IPv6NativeRoutingCIDR = c
}
}
if snap.ServiceLoopbackIPv4 != "" {
n.ServiceLoopbackIPv4 = net.ParseIP(snap.ServiceLoopbackIPv4)
}

n.Source = source.Local
n.NodeIdentity = uint32(identity.ReservedIdentityHost)
}
56 changes: 55 additions & 1 deletion daemon/cmd/local_node_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (ini *localNodeSynchronizer) InitLocalNode(ctx context.Context, n *node.Loc
return err
}

if err := ini.initFromK8s(ctx, n); err != nil {
if err := ini.initFromK8sWithDegradedFallback(ctx, n); err != nil {
return err
}

Expand All @@ -71,7 +71,61 @@ func (ini *localNodeSynchronizer) InitLocalNode(ctx context.Context, n *node.Loc
return nil
}

// initFromK8sWithDegradedFallback initializes the local node from Kubernetes.
// In the default (non-degraded) configuration it behaves exactly like
// initFromK8s and blocks until the apiserver provides the node object.
//
// When degraded start is enabled, the apiserver initialization is bounded by a
// timeout: if the apiserver is unreachable, the local node is restored from the
// on-disk snapshot written by a prior healthy run, so the agent (and therefore
// the BGP control plane) can come back up across a restart while the apiserver
// is down. The background SyncLocalNode loop reconciles the node once the
// apiserver becomes reachable again.
func (ini *localNodeSynchronizer) initFromK8sWithDegradedFallback(ctx context.Context, n *node.LocalNode) error {
if !ini.Config.EnableK8sDegradedStart || ini.K8sLocalNode == nil {
return ini.initFromK8s(ctx, n)
}

k8sCtx, cancel := context.WithTimeout(ctx, degradedLocalNodeInitTimeout)
defer cancel()

if err := ini.initFromK8s(k8sCtx, n); err == nil {
return nil
} else {
ini.Logger.Warn("Unable to initialize local node from Kubernetes; attempting to restore from on-disk snapshot (degraded start)",
logfields.Error, err)
}

snap, ok, err := loadLocalNodeSnapshot()
if err != nil {
ini.Logger.Warn("Failed to read local node snapshot during degraded start",
logfields.Error, err)
}
if !ok {
ini.Logger.Warn("No local node snapshot available; proceeding with configuration-derived local node only (degraded start)")
return nil
}

applyLocalNodeSnapshot(n, snap)
ini.Logger.Info("Restored local node from on-disk snapshot (degraded start)",
logfields.NodeName, n.Name)
return nil
}

func (ini *localNodeSynchronizer) SyncLocalNode(ctx context.Context, store *node.LocalNodeStore) {
// When degraded start is enabled, persist the local node on every change so
// that a subsequent restart during an apiserver outage can restore it from
// disk. This observes all updates, including those made by other components
// (e.g. IPAM setting the Cilium internal IP), not just the Kubernetes sync.
if ini.Config.EnableK8sDegradedStart {
go store.Observe(ctx, func(ln node.LocalNode) {
if err := saveLocalNodeSnapshot(ln); err != nil {
ini.Logger.Warn("Failed to persist local node snapshot",
logfields.Error, err)
}
}, func(error) {})
}

if ini.K8sLocalNode == nil {
return
}
Expand Down
35 changes: 34 additions & 1 deletion pkg/k8s/client/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
slim_clientset "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned"
k8sversion "github.com/cilium/cilium/pkg/k8s/version"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/option"
)

// client.Cell provides Clientset, a composition of clientsets to Kubernetes resources
Expand Down Expand Up @@ -237,8 +238,34 @@ func (c *compositeClientset) onStart(startCtx cell.HookContext) error {
return nil
}

degraded := option.Config.EnableK8sDegradedStart

if err := c.waitForConn(startCtx); err != nil {
return err
if !degraded {
return err
}
// Degraded start: the apiserver is unreachable. Rather than failing the
// whole agent (which would take down the datapath and the BGP control
// plane until the apiserver recovers), continue booting and let the
// heartbeat controller re-establish the connection in the background.
// Restore the previously detected apiserver version from disk so server
// capabilities stay consistent across the restart.
c.logger.Warn("Unable to connect to apiserver at startup; continuing in degraded mode (k8s-degraded-start is enabled)",
logfields.Error, err)
c.startHeartbeat()
if version, ok := loadK8sVersionSnapshot(); ok {
if ferr := k8sversion.Force(version); ferr != nil {
c.logger.Warn("Failed to restore Kubernetes apiserver version from snapshot during degraded start",
logfields.Error, ferr)
} else {
c.logger.Info("Restored Kubernetes apiserver version from on-disk snapshot for degraded start",
logfields.Version, version)
}
} else {
c.logger.Warn("No Kubernetes apiserver version snapshot available; server capabilities will use defaults until the apiserver becomes reachable")
}
c.started = true
return nil
}
c.startHeartbeat()

Expand All @@ -252,6 +279,12 @@ func (c *compositeClientset) onStart(startCtx cell.HookContext) error {
k8sversion.Version(), k8sversion.MinimalVersionConstraint)
}

// Persist the detected version so that a future degraded start (apiserver
// unreachable) can restore the server capabilities from disk.
if degraded {
saveK8sVersionSnapshot(c.logger, k8sversion.Version().String())
}

c.started = true

return nil
Expand Down
55 changes: 55 additions & 0 deletions pkg/k8s/client/degraded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package client

import (
"log/slog"
"os"
"path/filepath"
"strings"

"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/option"
)

// k8sVersionSnapshotFile is the file (relative to the runtime state directory)
// used to persist the detected apiserver version so that a degraded start can
// restore server capabilities without contacting the apiserver.
const k8sVersionSnapshotFile = "k8s-version.state"

func k8sVersionSnapshotPath() string {
dir := option.Config.RunDir
if dir == "" {
dir = "/var/run/cilium"
}
return filepath.Join(dir, k8sVersionSnapshotFile)
}

// saveK8sVersionSnapshot persists the detected apiserver version. It is
// best-effort: failures are logged but not fatal.
func saveK8sVersionSnapshot(logger *slog.Logger, version string) {
version = strings.TrimSpace(version)
if version == "" {
return
}
path := k8sVersionSnapshotPath()
if err := os.WriteFile(path, []byte(version), 0o600); err != nil {
logger.Warn("Failed to persist Kubernetes apiserver version snapshot",
logfields.Path, path, logfields.Error, err)
}
}

// loadK8sVersionSnapshot returns the previously persisted apiserver version, if
// any was recorded by a prior (healthy) start.
func loadK8sVersionSnapshot() (string, bool) {
b, err := os.ReadFile(k8sVersionSnapshotPath())
if err != nil {
return "", false
}
version := strings.TrimSpace(string(b))
if version == "" {
return "", false
}
return version, true
}
Loading