diff --git a/cmd/aws-cloud-controller-manager/main.go b/cmd/aws-cloud-controller-manager/main.go index 42a44cca6b6..ee25e0368d7 100644 --- a/cmd/aws-cloud-controller-manager/main.go +++ b/cmd/aws-cloud-controller-manager/main.go @@ -34,7 +34,6 @@ import ( cloudprovider "k8s.io/cloud-provider" "k8s.io/cloud-provider-aws/pkg/controllers/nodeipam" "k8s.io/cloud-provider-aws/pkg/controllers/tagging" - awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" awsv2 "k8s.io/cloud-provider-aws/pkg/providers/v2" "k8s.io/cloud-provider/app" @@ -76,6 +75,12 @@ func main() { Constructor: taggingControllerWrapper.StartTaggingControllerWrapper, } + controllerInitializers[tagging.TaggingControllerKey] = taggingControllerConstructor + app.ControllersDisabledByDefault.Insert(tagging.TaggingControllerKey) + + controllerAliases := names.CCMControllerAliases() + controllerAliases[tagging.TaggingControllerKey] = tagging.TaggingControllerKey + nodeIpamControllerWrapper := nodeipam.ControllerWrapper{} nodeIpamControllerWrapper.Options.AddFlags(fss.FlagSet("nodeipam controller")) @@ -86,20 +91,14 @@ func main() { Constructor: nodeIpamControllerWrapper.StartNodeIpamControllerWrapper, } - controllerInitializers[tagging.TaggingControllerKey] = taggingControllerConstructor - app.ControllersDisabledByDefault.Insert(tagging.TaggingControllerKey) - - controllerAliases := names.CCMControllerAliases() - controllerAliases[tagging.TaggingControllerKey] = tagging.TaggingControllerKey - controllerInitializers[nodeipam.NodeIpamControllerKey] = nodeIpamControllerConstructor app.ControllersDisabledByDefault.Insert(nodeipam.NodeIpamControllerKey) + command := app.NewCloudControllerManagerCommand(opts, cloudInitializer, controllerInitializers, controllerAliases, fss, wait.NeverStop) if err := command.Execute(); err != nil { klog.Fatalf("unable to execute command: %v", err) } - } func cloudInitializer(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface { diff --git a/pkg/controllers/nodeipam/config/types.go b/pkg/controllers/nodeipam/config/types.go new file mode 100644 index 00000000000..2e1c7e90f46 --- /dev/null +++ b/pkg/controllers/nodeipam/config/types.go @@ -0,0 +1,30 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import "net" + +// NodeIPAMControllerConfiguration contains elements describing NodeIPAMController. +type NodeIPAMControllerConfiguration struct { + RateLimit float64 + BurstLimit int + DualStack bool + ClusterCIDRs []*net.IPNet + // NodeCIDRMaskSize is the mask size for node cidr in single-stack cluster. + // This can be used only with single stack clusters and is incompatible with dual stack clusters. + NodeCIDRMaskSize int32 +} diff --git a/pkg/controllers/nodeipam/ipam/cidr_allocator.go b/pkg/controllers/nodeipam/ipam/cidr_allocator.go new file mode 100644 index 00000000000..f825b67ba0d --- /dev/null +++ b/pkg/controllers/nodeipam/ipam/cidr_allocator.go @@ -0,0 +1,478 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "k8s.io/klog/v2" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + informers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + cidrset "k8s.io/cloud-provider-aws/pkg/controllers/nodeipam/ipam/cidrset" + awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" + nodeutil "k8s.io/cloud-provider-aws/pkg/util" +) + +// cidrs are reserved, then node resource is patched with them +// this type holds the reservation info for a node +type NodeReservedCIDRs struct { + allocatedCIDRs []*net.IPNet + nodeName string +} + +// TODO: figure out the good setting for those constants. +const ( + // The amount of time the nodecontroller polls on the list nodes endpoint. + apiserverStartupGracePeriod = 10 * time.Minute + + // The no. of NodeSpec updates NC can process concurrently. + cidrUpdateWorkers = 30 + + // The max no. of NodeSpec updates that can be enqueued. + CidrUpdateQueueSize = 5000 + + // cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it. + cidrUpdateRetries = 3 + + // updateRetryTimeout is the time to wait before requeing a failed node for retry + updateRetryTimeout = 250 * time.Millisecond + + // maxUpdateRetryTimeout is the maximum amount of time between timeouts. + maxUpdateRetryTimeout = 5 * time.Second + + // updateMaxRetries is the max retries for a failed node + updateMaxRetries = 10 +) + +// nodePollInterval is used in listing node +// This is a variable instead of a const to enable testing. +var nodePollInterval = 10 * time.Second + +// CIDRAllocator is an interface implemented by things that know how +// to allocate/occupy/recycle CIDR for nodes. +type CIDRAllocator interface { + // AllocateOrOccupyCIDR looks at the given node, assigns it a valid + // CIDR if it doesn't currently have one or mark the CIDR as used if + // the node already have one. + AllocateOrOccupyCIDR(node *v1.Node) error + // ReleaseCIDR releases the CIDR of the removed node + ReleaseCIDR(node *v1.Node) error + // Run starts all the working logic of the allocator. + Run(stopCh <-chan struct{}) +} + +// CIDRAllocatorParams is parameters that's required for creating new +// cidr range allocator. +type CIDRAllocatorParams struct { + // ClusterCIDRs is list of cluster cidrs + ClusterCIDRs []*net.IPNet + // NodeCIDRMaskSizes is list of node cidr mask sizes + NodeCIDRMaskSizes []int +} + +func listNodes(kubeClient clientset.Interface) (*v1.NodeList, error) { + var nodeList *v1.NodeList + // We must poll because apiserver might not be up. This error causes + // controller manager to restart. + if pollErr := wait.Poll(nodePollInterval, apiserverStartupGracePeriod, func() (bool, error) { + var err error + nodeList, err = kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.Everything().String(), + LabelSelector: labels.Everything().String(), + }) + if err != nil { + klog.Errorf("Failed to list all nodes: %v", err) + return false, nil + } + return true, nil + }); pollErr != nil { + return nil, fmt.Errorf("failed to list all nodes in %v, cannot proceed without updating CIDR map", + apiserverStartupGracePeriod) + } + return nodeList, nil +} + +type rangeAllocator struct { + client clientset.Interface + // cluster cidrs as passed in during controller creation + clusterCIDRs []*net.IPNet + // for each entry in clusterCIDRs we maintain a list of what is used and what is not + cidrSets []*cidrset.CidrSet + // nodeLister is able to list/get nodes and is populated by the shared informer passed to controller + nodeLister corelisters.NodeLister + // nodesSynced returns true if the node shared informer has been synced at least once. + nodesSynced cache.InformerSynced + // Channel that is used to pass updating Nodes and their reserved CIDRs to the background + // This increases a throughput of CIDR assignment by not blocking on long operations. + nodeCIDRUpdateChannel chan NodeReservedCIDRs + recorder record.EventRecorder + // Keep a set of nodes that are currently being processed to avoid races in CIDR allocation + lock sync.Mutex + nodesInProcessing sets.String + cloud *awsv1.Cloud +} + +// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs) +// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. +// Caller must always pass in a list of existing nodes so the new allocator. +// can initialize its CIDR map. NodeList is only nil in testing. +func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, awsCloud *awsv1.Cloud, allocatorParams CIDRAllocatorParams, nodeList *v1.NodeList) (CIDRAllocator, error) { + if client == nil { + klog.Fatalf("kubeClient is nil when starting NodeController") + } + + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"}) + eventBroadcaster.StartStructuredLogging(0) + klog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) + + // create a cidrSet for each CIDR we operate on. + // cidrSet are mapped to clusterCIDR by index + cidrSets := make([]*cidrset.CidrSet, len(allocatorParams.ClusterCIDRs)) + for idx, cidr := range allocatorParams.ClusterCIDRs { + cidrSet, err := cidrset.NewCIDRSet(cidr, allocatorParams.NodeCIDRMaskSizes[idx]) + if err != nil { + return nil, err + } + cidrSets[idx] = cidrSet + } + + ra := &rangeAllocator{ + client: client, + clusterCIDRs: allocatorParams.ClusterCIDRs, + cloud: awsCloud, + cidrSets: cidrSets, + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + nodeCIDRUpdateChannel: make(chan NodeReservedCIDRs, CidrUpdateQueueSize), + recorder: recorder, + nodesInProcessing: sets.NewString(), + } + + if nodeList != nil { + for _, node := range nodeList.Items { + if len(node.Spec.PodCIDRs) == 0 { + klog.V(4).Infof("Node %v has no CIDR, ignoring", node.Name) + continue + } + klog.V(4).Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR) + if err := ra.occupyCIDRs(&node); err != nil { + // This will happen if: + // 1. We find garbage in the podCIDRs field. Retrying is useless. + // 2. CIDR out of range: This means a node CIDR has changed. + // This error will keep crashing controller-manager. + return nil, err + } + } + } + + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + // If the PodCIDRs list is not empty we either: + // - already processed a Node that already had CIDRs after NC restarted + // (cidr is marked as used), + // - already processed a Node successfully and allocated CIDRs for it + // (cidr is marked as used), + // - already processed a Node but we did saw a "timeout" response and + // request eventually got through in this case we haven't released + // the allocated CIDRs (cidr is still marked as used). + // There's a possible error here: + // - NC sees a new Node and assigns CIDRs X,Y.. to it, + // - Update Node call fails with a timeout, + // - Node is updated by some other component, NC sees an update and + // assigns CIDRs A,B.. to the Node, + // - Both CIDR X,Y.. and CIDR A,B.. are marked as used in the local cache, + // even though Node sees only CIDR A,B.. + // The problem here is that in in-memory cache we see CIDR X,Y.. as marked, + // which prevents it from being assigned to any new node. The cluster + // state is correct. + // Restart of NC fixes the issue. + if len(newNode.Spec.PodCIDRs) == 0 { + return ra.AllocateOrOccupyCIDR(newNode) + } + return nil + }), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR), + }) + + return ra, nil +} + +func (r *rangeAllocator) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + klog.Infof("Starting range CIDR allocator") + defer klog.Infof("Shutting down range CIDR allocator") + + if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, r.nodesSynced) { + return + } + + for i := 0; i < cidrUpdateWorkers; i++ { + go r.worker(stopCh) + } + + <-stopCh +} + +func (r *rangeAllocator) worker(stopChan <-chan struct{}) { + for { + select { + case workItem, ok := <-r.nodeCIDRUpdateChannel: + if !ok { + klog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed") + return + } + if err := r.updateCIDRsAllocation(workItem); err != nil { + // Requeue the failed node for update again. + r.nodeCIDRUpdateChannel <- workItem + } + case <-stopChan: + return + } + } +} + +func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool { + r.lock.Lock() + defer r.lock.Unlock() + if r.nodesInProcessing.Has(nodeName) { + return false + } + r.nodesInProcessing.Insert(nodeName) + return true +} + +func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) { + r.lock.Lock() + defer r.lock.Unlock() + r.nodesInProcessing.Delete(nodeName) +} + +// marks node.PodCIDRs[...] as used in allocator's tracked cidrSet +func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error { + defer r.removeNodeFromProcessing(node.Name) + if len(node.Spec.PodCIDRs) == 0 { + return nil + } + for idx, cidr := range node.Spec.PodCIDRs { + _, podCIDR, err := net.ParseCIDR(cidr) + if err != nil { + return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + } + // If node has a pre allocate cidr that does not exist in our cidrs. + // This will happen if cluster went from dualstack(multi cidrs) to non-dualstack + // then we have now way of locking it + if idx >= len(r.cidrSets) { + return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx) + } + + if err := r.cidrSets[idx].Occupy(podCIDR); err != nil { + return fmt.Errorf("failed to mark cidr[%v] at idx [%v] as occupied for node: %v: %v", podCIDR, idx, node.Name, err) + } + } + return nil +} + +// WARNING: If you're adding any return calls or defer any more work from this +// function you have to make sure to update nodesInProcessing properly with the +// disposition of the node when the work is done. +func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { + if node == nil { + return nil + } + if !r.insertNodeToProcessing(node.Name) { + klog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name) + return nil + } + + if len(node.Spec.PodCIDRs) > 0 { + return r.occupyCIDRs(node) + } + // allocate and queue the assignment + allocated := NodeReservedCIDRs{ + nodeName: node.Name, + allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)), + } + + for idx := range r.cidrSets { + podCIDR, err := r.cidrSets[idx].AllocateNext() + if err != nil { + r.removeNodeFromProcessing(node.Name) + nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err) + } + allocated.allocatedCIDRs[idx] = podCIDR + } + + //queue the assignment + klog.V(4).Infof("Putting node %s with CIDR %v into the work queue", node.Name, allocated.allocatedCIDRs) + r.nodeCIDRUpdateChannel <- allocated + return nil +} + +// ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets +func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error { + if node == nil || len(node.Spec.PodCIDRs) == 0 { + return nil + } + + for idx, cidr := range node.Spec.PodCIDRs { + _, podCIDR, err := net.ParseCIDR(cidr) + if err != nil { + return fmt.Errorf("failed to parse CIDR %s on Node %v: %v", cidr, node.Name, err) + } + + // If node has a pre allocate cidr that does not exist in our cidrs. + // This will happen if cluster went from dualstack(multi cidrs) to non-dualstack + // then we have now way of locking it + if idx >= len(r.cidrSets) { + return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx) + } + + klog.V(4).Infof("release CIDR %s for node:%v", cidr, node.Name) + if err = r.cidrSets[idx].Release(podCIDR); err != nil { + return fmt.Errorf("error when releasing CIDR %v: %v", cidr, err) + } + } + return nil +} + +// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server. +func (r *rangeAllocator) updateCIDRsAllocation(data NodeReservedCIDRs) error { + var err error + var node *v1.Node + defer r.removeNodeFromProcessing(data.nodeName) + cidrsString := cidrsAsString(data.allocatedCIDRs) + node, err = r.nodeLister.Get(data.nodeName) + if err != nil { + klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDRs: %v", data.nodeName, err) + return err + } + + // if cidr list matches the proposed. + // then we possibly updated this node + // and just failed to ack the success. + if len(node.Spec.PodCIDRs) == len(data.allocatedCIDRs) { + match := true + for idx, cidr := range cidrsString { + if node.Spec.PodCIDRs[idx] != cidr { + match = false + break + } + } + if match { + klog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, data.allocatedCIDRs) + return nil + } + } + + // node has cidrs, release the reserved + if len(node.Spec.PodCIDRs) != 0 { + klog.Errorf("Node %v already has a CIDR allocated %v. Releasing the new one.", node.Name, node.Spec.PodCIDRs) + for idx, cidr := range data.allocatedCIDRs { + if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil { + klog.Errorf("Error when releasing CIDR idx:%v value: %v err:%v", idx, cidr, releaseErr) + } + } + return nil + } + + // fetch ipv6 cidr address + if node.Spec.ProviderID == "" { + klog.Infof("Node %q has empty provider ID", node.Name) + return nil + } + + // aws:///eu-central-1a/i-07577a7bcf3e576f2 + instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() + eni, err := r.cloud.DescribeNetworkInterfaces( + &ec2.DescribeNetworkInterfacesInput{ + Filters: []*ec2.Filter{ + { + Name: ptrTo("attachment.instance-id"), + Values: []*string{ + ptrTo(string(instanceID)), + }, + }, + }, + }) + if err != nil { + return err + } + + if len(eni.Ipv6Prefixes) != 1 { + return fmt.Errorf("unexpected amount of ipv6 prefixes on interface %q: %v", *eni.NetworkInterfaceId, len(eni.Ipv6Prefixes)) + } + + ipv6Address := aws.StringValue(eni.Ipv6Prefixes[0].Ipv6Prefix) + cidrsString = append(cidrsString, ipv6Address) + + // If we reached here, it means that the node has no CIDR currently assigned. So we set it. + for i := 0; i < cidrUpdateRetries; i++ { + if err = PatchNodePodCIDRs(r.client, node, cidrsString); err == nil { + klog.Infof("Set node %v PodCIDR to %v", node.Name, cidrsString) + return nil + } + } + // failed release back to the pool + klog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, cidrsString, err) + nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + // We accept the fact that we may leak CIDRs here. This is safer than releasing + // them in case when we don't know if request went through. + // NodeController restart will return all falsely allocated CIDRs to the pool. + if !apierrors.IsServerTimeout(err) { + klog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", node.Name, err) + for idx, cidr := range data.allocatedCIDRs { + if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil { + klog.Errorf("Error releasing allocated CIDR for node %v: %v", node.Name, releaseErr) + } + } + } + return err +} + +// converts a slice of cidrs into ,, +func cidrsAsString(inCIDRs []*net.IPNet) []string { + outCIDRs := make([]string, len(inCIDRs)) + for idx, inCIDR := range inCIDRs { + outCIDRs[idx] = inCIDR.String() + } + return outCIDRs +} diff --git a/pkg/controllers/nodeipam/ipam/cidrset/cidrset.go b/pkg/controllers/nodeipam/ipam/cidrset/cidrset.go new file mode 100644 index 00000000000..b31ef6b78c9 --- /dev/null +++ b/pkg/controllers/nodeipam/ipam/cidrset/cidrset.go @@ -0,0 +1,295 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cidrset + +import ( + "encoding/binary" + "errors" + "fmt" + "math/big" + "math/bits" + "net" + "sync" +) + +// CidrSet manages a set of CIDR ranges from which blocks of IPs can +// be allocated from. +type CidrSet struct { + sync.Mutex + // clusterCIDR is the CIDR assigned to the cluster + clusterCIDR *net.IPNet + // clusterMaskSize is the mask size, in bits, assigned to the cluster + // caches the mask size to avoid the penalty of calling clusterCIDR.Mask.Size() + clusterMaskSize int + // nodeMask is the network mask assigned to the nodes + nodeMask net.IPMask + // nodeMaskSize is the mask size, in bits,assigned to the nodes + // caches the mask size to avoid the penalty of calling nodeMask.Size() + nodeMaskSize int + // maxCIDRs is the maximum number of CIDRs that can be allocated + maxCIDRs int + // allocatedCIDRs counts the number of CIDRs allocated + allocatedCIDRs int + // nextCandidate points to the next CIDR that should be free + nextCandidate int + // used is a bitmap used to track the CIDRs allocated + used big.Int + // label is used to identify the metrics + label string +} + +const ( + // The subnet mask size cannot be greater than 16 more than the cluster mask size + // TODO: https://github.com/kubernetes/kubernetes/issues/44918 + // clusterSubnetMaxDiff limited to 16 due to the uncompressed bitmap + // Due to this limitation the subnet mask for IPv6 cluster cidr needs to be >= 48 + // as default mask size for IPv6 is 64. + clusterSubnetMaxDiff = 16 + // halfIPv6Len is the half of the IPv6 length + halfIPv6Len = net.IPv6len / 2 +) + +var ( + // ErrCIDRRangeNoCIDRsRemaining occurs when there is no more space + // to allocate CIDR ranges. + ErrCIDRRangeNoCIDRsRemaining = errors.New( + "CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range") + // ErrCIDRSetSubNetTooBig occurs when the subnet mask size is too + // big compared to the CIDR mask size. + ErrCIDRSetSubNetTooBig = errors.New( + "New CIDR set failed; the node CIDR size is too big") +) + +// NewCIDRSet creates a new CidrSet. +func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) (*CidrSet, error) { + clusterMask := clusterCIDR.Mask + clusterMaskSize, bits := clusterMask.Size() + + var maxCIDRs int + if (clusterCIDR.IP.To4() == nil) && (subNetMaskSize-clusterMaskSize > clusterSubnetMaxDiff) { + return nil, ErrCIDRSetSubNetTooBig + } + + // register CidrSet metrics + registerCidrsetMetrics() + + maxCIDRs = 1 << uint32(subNetMaskSize-clusterMaskSize) + return &CidrSet{ + clusterCIDR: clusterCIDR, + nodeMask: net.CIDRMask(subNetMaskSize, bits), + clusterMaskSize: clusterMaskSize, + maxCIDRs: maxCIDRs, + nodeMaskSize: subNetMaskSize, + label: clusterCIDR.String(), + }, nil +} + +func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet { + var ip []byte + switch /*v4 or v6*/ { + case s.clusterCIDR.IP.To4() != nil: + { + j := uint32(index) << uint32(32-s.nodeMaskSize) + ipInt := (binary.BigEndian.Uint32(s.clusterCIDR.IP)) | j + ip = make([]byte, net.IPv4len) + binary.BigEndian.PutUint32(ip, ipInt) + } + case s.clusterCIDR.IP.To16() != nil: + { + // leftClusterIP | rightClusterIP + // 2001:0DB8:1234:0000:0000:0000:0000:0000 + const v6NBits = 128 + const halfV6NBits = v6NBits / 2 + leftClusterIP := binary.BigEndian.Uint64(s.clusterCIDR.IP[:halfIPv6Len]) + rightClusterIP := binary.BigEndian.Uint64(s.clusterCIDR.IP[halfIPv6Len:]) + + ip = make([]byte, net.IPv6len) + + if s.nodeMaskSize <= halfV6NBits { + // We only care about left side IP + leftClusterIP |= uint64(index) << uint(halfV6NBits-s.nodeMaskSize) + } else { + if s.clusterMaskSize < halfV6NBits { + // see how many bits are needed to reach the left side + btl := uint(s.nodeMaskSize - halfV6NBits) + indexMaxBit := uint(64 - bits.LeadingZeros64(uint64(index))) + if indexMaxBit > btl { + leftClusterIP |= uint64(index) >> btl + } + } + // the right side will be calculated the same way either the + // subNetMaskSize affects both left and right sides + rightClusterIP |= uint64(index) << uint(v6NBits-s.nodeMaskSize) + } + binary.BigEndian.PutUint64(ip[:halfIPv6Len], leftClusterIP) + binary.BigEndian.PutUint64(ip[halfIPv6Len:], rightClusterIP) + } + } + return &net.IPNet{ + IP: ip, + Mask: s.nodeMask, + } +} + +// AllocateNext allocates the next free CIDR range. This will set the range +// as occupied and return the allocated range. +func (s *CidrSet) AllocateNext() (*net.IPNet, error) { + s.Lock() + defer s.Unlock() + + if s.allocatedCIDRs == s.maxCIDRs { + return nil, ErrCIDRRangeNoCIDRsRemaining + } + candidate := s.nextCandidate + var i int + for i = 0; i < s.maxCIDRs; i++ { + if s.used.Bit(candidate) == 0 { + break + } + candidate = (candidate + 1) % s.maxCIDRs + } + + s.nextCandidate = (candidate + 1) % s.maxCIDRs + s.used.SetBit(&s.used, candidate, 1) + s.allocatedCIDRs++ + // Update metrics + cidrSetAllocations.WithLabelValues(s.label).Inc() + cidrSetAllocationTriesPerRequest.WithLabelValues(s.label).Observe(float64(i)) + cidrSetUsage.WithLabelValues(s.label).Set(float64(s.allocatedCIDRs) / float64(s.maxCIDRs)) + + return s.indexToCIDRBlock(candidate), nil +} + +func (s *CidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) { + if cidr == nil { + return -1, -1, fmt.Errorf("error getting indices for cluster cidr %v, cidr is nil", s.clusterCIDR) + } + begin, end = 0, s.maxCIDRs-1 + cidrMask := cidr.Mask + maskSize, _ := cidrMask.Size() + var ipSize int + + if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) { + return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR) + } + + if s.clusterMaskSize < maskSize { + + ipSize = net.IPv4len + if cidr.IP.To4() == nil { + ipSize = net.IPv6len + } + begin, err = s.getIndexForCIDR(&net.IPNet{ + IP: cidr.IP.Mask(s.nodeMask), + Mask: s.nodeMask, + }) + if err != nil { + return -1, -1, err + } + ip := make([]byte, ipSize) + if cidr.IP.To4() != nil { + ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask)) + binary.BigEndian.PutUint32(ip, ipInt) + } else { + // ipIntLeft | ipIntRight + // 2001:0DB8:1234:0000:0000:0000:0000:0000 + ipIntLeft := binary.BigEndian.Uint64(cidr.IP[:net.IPv6len/2]) | (^binary.BigEndian.Uint64(cidr.Mask[:net.IPv6len/2])) + ipIntRight := binary.BigEndian.Uint64(cidr.IP[net.IPv6len/2:]) | (^binary.BigEndian.Uint64(cidr.Mask[net.IPv6len/2:])) + binary.BigEndian.PutUint64(ip[:net.IPv6len/2], ipIntLeft) + binary.BigEndian.PutUint64(ip[net.IPv6len/2:], ipIntRight) + } + end, err = s.getIndexForCIDR(&net.IPNet{ + IP: net.IP(ip).Mask(s.nodeMask), + Mask: s.nodeMask, + }) + if err != nil { + return -1, -1, err + } + } + return begin, end, nil +} + +// Release releases the given CIDR range. +func (s *CidrSet) Release(cidr *net.IPNet) error { + begin, end, err := s.getBeginingAndEndIndices(cidr) + if err != nil { + return err + } + s.Lock() + defer s.Unlock() + for i := begin; i <= end; i++ { + // Only change the counters if we change the bit to prevent + // double counting. + if s.used.Bit(i) != 0 { + s.used.SetBit(&s.used, i, 0) + s.allocatedCIDRs-- + cidrSetReleases.WithLabelValues(s.label).Inc() + } + } + + cidrSetUsage.WithLabelValues(s.label).Set(float64(s.allocatedCIDRs) / float64(s.maxCIDRs)) + return nil +} + +// Occupy marks the given CIDR range as used. Occupy succeeds even if the CIDR +// range was previously used. +func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) { + begin, end, err := s.getBeginingAndEndIndices(cidr) + if err != nil { + return err + } + s.Lock() + defer s.Unlock() + for i := begin; i <= end; i++ { + // Only change the counters if we change the bit to prevent + // double counting. + if s.used.Bit(i) == 0 { + s.used.SetBit(&s.used, i, 1) + s.allocatedCIDRs++ + cidrSetAllocations.WithLabelValues(s.label).Inc() + } + } + + cidrSetUsage.WithLabelValues(s.label).Set(float64(s.allocatedCIDRs) / float64(s.maxCIDRs)) + return nil +} + +func (s *CidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) { + return s.getIndexForIP(cidr.IP) +} + +func (s *CidrSet) getIndexForIP(ip net.IP) (int, error) { + if ip.To4() != nil { + cidrIndex := (binary.BigEndian.Uint32(s.clusterCIDR.IP) ^ binary.BigEndian.Uint32(ip.To4())) >> uint32(32-s.nodeMaskSize) + if cidrIndex >= uint32(s.maxCIDRs) { + return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.nodeMaskSize) + } + return int(cidrIndex), nil + } + if ip.To16() != nil { + bigIP := big.NewInt(0).SetBytes(s.clusterCIDR.IP) + bigIP = bigIP.Xor(bigIP, big.NewInt(0).SetBytes(ip)) + cidrIndexBig := bigIP.Rsh(bigIP, uint(net.IPv6len*8-s.nodeMaskSize)) + cidrIndex := cidrIndexBig.Uint64() + if cidrIndex >= uint64(s.maxCIDRs) { + return 0, fmt.Errorf("CIDR: %v/%v is out of the range of CIDR allocator", ip, s.nodeMaskSize) + } + return int(cidrIndex), nil + } + + return 0, fmt.Errorf("invalid IP: %v", ip) +} diff --git a/pkg/controllers/nodeipam/ipam/cidrset/metrics.go b/pkg/controllers/nodeipam/ipam/cidrset/metrics.go new file mode 100644 index 00000000000..9bc2b2c17b1 --- /dev/null +++ b/pkg/controllers/nodeipam/ipam/cidrset/metrics.go @@ -0,0 +1,78 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cidrset + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const nodeIpamSubsystem = "node_ipam_controller" + +var ( + cidrSetAllocations = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: nodeIpamSubsystem, + Name: "cidrset_cidrs_allocations_total", + Help: "Counter measuring total number of CIDR allocations.", + StabilityLevel: metrics.ALPHA, + }, + []string{"clusterCIDR"}, + ) + cidrSetReleases = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: nodeIpamSubsystem, + Name: "cidrset_cidrs_releases_total", + Help: "Counter measuring total number of CIDR releases.", + StabilityLevel: metrics.ALPHA, + }, + []string{"clusterCIDR"}, + ) + cidrSetUsage = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: nodeIpamSubsystem, + Name: "cidrset_usage_cidrs", + Help: "Gauge measuring percentage of allocated CIDRs.", + StabilityLevel: metrics.ALPHA, + }, + []string{"clusterCIDR"}, + ) + cidrSetAllocationTriesPerRequest = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: nodeIpamSubsystem, + Name: "cidrset_allocation_tries_per_request", + Help: "Number of endpoints added on each Service sync", + StabilityLevel: metrics.ALPHA, + Buckets: metrics.ExponentialBuckets(1, 5, 5), + }, + []string{"clusterCIDR"}, + ) +) + +var registerMetrics sync.Once + +// registerCidrsetMetrics the metrics that are to be monitored. +func registerCidrsetMetrics() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(cidrSetAllocations) + legacyregistry.MustRegister(cidrSetReleases) + legacyregistry.MustRegister(cidrSetUsage) + legacyregistry.MustRegister(cidrSetAllocationTriesPerRequest) + }) +} diff --git a/pkg/controllers/nodeipam/ipam/ipv6_allocator.go b/pkg/controllers/nodeipam/ipam/ipv6_allocator.go new file mode 100644 index 00000000000..29abda40f8f --- /dev/null +++ b/pkg/controllers/nodeipam/ipam/ipv6_allocator.go @@ -0,0 +1,282 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + v1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + informers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" + "k8s.io/klog/v2" +) + +const ( + maxRequeuingCount = 9 + + // The label for depicting total number of errors a work item encounter and succeed + totalErrorsWorkItemErrorMetric = "total_errors" + + // The label for depicting total time when work item gets queued to processed + workItemProcessingTimeWorkItemMetric = "work_item_processing_time" + + // The label for depicting total time when work item gets queued to dequeued + workItemDequeuingTimeWorkItemMetric = "work_item_dequeuing_time" + + // The label for depicting total number of errors a work item encounter and fail + errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted" +) + +// IPv6CIDRAllocator is an interface implemented by things that know how +// to allocate CIDR for nodes. +type IPv6CIDRAllocator interface { + Run(stopCh <-chan struct{}) +} + +type IPv6RangeAllocator struct { + nodeInformer coreinformers.NodeInformer + kubeClient clientset.Interface + cloud *awsv1.Cloud + workqueue workqueue.RateLimitingInterface + nodesSynced cache.InformerSynced + + // Value controlling Controller monitoring period, i.e. how often does Controller + // check node list. This value should be lower than nodeMonitorGracePeriod + // set in controller-manager + nodeMonitorPeriod time.Duration + + rateLimitEnabled bool +} + +// workItem contains the node and an action for that node +type workItem struct { + node *v1.Node + action func(node *v1.Node) error + requeuingCount int + enqueueTime time.Time +} + +func (w workItem) String() string { + return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime) +} + +func NewIPv6RangeAllocator(kubeClient clientset.Interface, nodeInformer informers.NodeInformer, awsCloud *awsv1.Cloud, rateLimiter workqueue.RateLimiter, rateLimitEnabled bool, nodeMonitorPeriod time.Duration) (IPv6CIDRAllocator, error) { + ra6 := &IPv6RangeAllocator{ + nodeInformer: nodeInformer, + kubeClient: kubeClient, + cloud: awsCloud, + workqueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "NodeIpam"), + nodesSynced: nodeInformer.Informer().HasSynced, + nodeMonitorPeriod: nodeMonitorPeriod, + rateLimitEnabled: rateLimitEnabled, + } + // Use shared informer to listen to add/update/delete of nodes. Note that any nodes + // that exist before nodeipam controller starts will show up in the update method + ra6.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + ra6.enqueueNode(node, ra6.prefixNodeResource) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + node := newObj.(*v1.Node) + // Check if nodeipam is required by inspecting the labels. This check here prevents us from putting a tagged node into the + // work queue. We check this again before nodeipam the node to make sure that between when a node was put in the work queue + // and when it gets prefixed, there might be another event which put the same item in the work queue + // (since the node won't have the labels yet) and hence prevents us from making an unnecessary EC2 call. + if !ra6.isPrefixNodeRequired(node) { + klog.Infof("Skip putting node %s in work queue since it was already prefixed earlier.", node.GetName()) + return + } + + ra6.enqueueNode(node, ra6.prefixNodeResource) + }, + }) + + return ra6, nil +} + +// Run will start the controller and write the prefix CIDR from the network interface to the node +func (ra6 *IPv6RangeAllocator) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer ra6.workqueue.ShutDown() + + // Wait for the caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, ra6.nodesSynced); !ok { + klog.Errorf("failed to wait for caches to sync") + return + } + + klog.Infof("Starting the nodeipam controller") + go wait.Until(ra6.work, ra6.nodeMonitorPeriod, stopCh) + + <-stopCh +} + +// work is a long-running function that continuously +// call process() for each message on the workqueue +func (ra6 *IPv6RangeAllocator) work() { + for ra6.process() { + } +} + +// process reads each message in the queue and performs either +// add prefix to kubernetes node object +func (ra6 *IPv6RangeAllocator) process() bool { + obj, shutdown := ra6.workqueue.Get() + if shutdown { + return false + } + + klog.Infof("Starting to process %s", obj) + + err := func(obj interface{}) error { + defer ra6.workqueue.Done(obj) + + workItem, ok := obj.(*workItem) + if !ok { + ra6.workqueue.Forget(obj) + err := fmt.Errorf("expected workItem in workqueue but got %s", obj) + utilruntime.HandleError(err) + return nil + } + + timeTaken := time.Since(workItem.enqueueTime).Seconds() + recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken) + klog.Infof("Dequeuing latency %s", timeTaken) + + instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID() + if err != nil { + err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err) + utilruntime.HandleError(err) + return nil + } + klog.Infof("Instance ID of work item %s is %s", workItem, instanceID) + + if awsv1.IsFargateNode(string(instanceID)) { + klog.Infof("Skip processing the node %s since it is a Fargate node", instanceID) + ra6.workqueue.Forget(obj) + return nil + } + + err = workItem.action(workItem.node) + + if err != nil { + if workItem.requeuingCount < maxRequeuingCount { + // Put the item back on the workqueue to handle any transient errors. + workItem.requeuingCount++ + ra6.workqueue.AddRateLimited(workItem) + + recordWorkItemErrorMetrics(totalErrorsWorkItemErrorMetric, string(instanceID)) + return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount) + } + + klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error()) + recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID)) + } else { + klog.Infof("Finished processing %s", workItem) + timeTaken = time.Since(workItem.enqueueTime).Seconds() + recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken) + klog.Infof("Processing latency %s", timeTaken) + } + + ra6.workqueue.Forget(obj) + return nil + }(obj) + + if err != nil { + klog.Errorf("Error occurred while processing %s", obj) + utilruntime.HandleError(err) + } + + return true +} + +func (ra6 *IPv6RangeAllocator) prefixNodeResource(node *v1.Node) error { + if node.Spec.ProviderID == "" { + klog.Infof("Node %q has empty provider ID", node.Name) + return nil + } + + // aws:///eu-central-1a/i-07577a7bcf3e576f2 + instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() + eni, err := ra6.cloud.DescribeNetworkInterfaces( + &ec2.DescribeNetworkInterfacesInput{ + Filters: []*ec2.Filter{ + { + Name: ptrTo("attachment.instance-id"), + Values: []*string{ + ptrTo(string(instanceID)), + }, + }, + }, + }) + if err != nil { + return err + } + + if len(eni.Ipv6Prefixes) != 1 { + return fmt.Errorf("unexpected amount of ipv6 prefixes on interface %q: %v", *eni.NetworkInterfaceId, len(eni.Ipv6Prefixes)) + } + + ipv6Address := aws.StringValue(eni.Ipv6Prefixes[0].Ipv6Prefix) + if err := PatchNodePodCIDRs(ra6.kubeClient, node, []string{ipv6Address}); err != nil { + return err + } + klog.Infof("Successfully prefixed node %s with %v.", node.GetName(), ipv6Address) + return nil +} + +// enqueueNode takes in the object and an +// action for the object for a workitem and enqueue to the workqueue +func (ra6 *IPv6RangeAllocator) enqueueNode(node *v1.Node, action func(node *v1.Node) error) { + item := &workItem{ + node: node, + action: action, + requeuingCount: 0, + enqueueTime: time.Now(), + } + + if ra6.rateLimitEnabled { + ra6.workqueue.AddRateLimited(item) + klog.Infof("Added %s to the workqueue (rate-limited)", item) + } else { + ra6.workqueue.Add(item) + klog.Infof("Added %s to the workqueue (without any rate-limit)", item) + } +} + +func (ra6 *IPv6RangeAllocator) isPrefixNodeRequired(node *v1.Node) bool { + if node.Spec.PodCIDR == "" && node.Spec.PodCIDRs == nil { + return true + } + return false +} + +// ptrTo returns a pointer to a copy of any value. +func ptrTo[T any](v T) *T { + return &v +} diff --git a/pkg/controllers/nodeipam/metrics.go b/pkg/controllers/nodeipam/ipam/metrics.go similarity index 97% rename from pkg/controllers/nodeipam/metrics.go rename to pkg/controllers/nodeipam/ipam/metrics.go index 9d870983adc..bd3a9d8e918 100644 --- a/pkg/controllers/nodeipam/metrics.go +++ b/pkg/controllers/nodeipam/ipam/metrics.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package nodeipam +package ipam import ( "sync" @@ -45,7 +45,7 @@ var ( ) // registerMetrics registers nodeipam-controller metrics. -func registerMetrics() { +func RegisterMetrics() { register.Do(func() { legacyregistry.MustRegister(workItemDuration) legacyregistry.MustRegister(workItemError) diff --git a/pkg/controllers/nodeipam/ipam/util.go b/pkg/controllers/nodeipam/ipam/util.go new file mode 100644 index 00000000000..3cc4c0277f4 --- /dev/null +++ b/pkg/controllers/nodeipam/ipam/util.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipam + +import ( + "context" + "encoding/json" + "fmt" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +type NodePatch struct { + Spec *NodePatchSpec `json:"spec,omitempty"` + Metadata *NodePatchMetadata `json:"metadata,omitempty"` +} + +type NodePatchSpec struct { + PodCIDR string `json:"podCIDR,omitempty"` + PodCIDRs []string `json:"podCIDRs,omitempty"` +} + +type NodePatchMetadata struct { + Labels map[string]*string `json:"labels,omitempty"` +} + +// PatchNodePodCIDRs patches the node podCIDR to the specified value. +func PatchNodePodCIDRs(kubeClient clientset.Interface, node *v1.Node, cidr []string) error { + klog.Infof("assigning cidr %q to node %q", cidr, node.ObjectMeta.Name) + nodePatchSpec := &NodePatchSpec{ + PodCIDR: cidr[0], + PodCIDRs: cidr, + } + nodePatch := &NodePatch{ + Spec: nodePatchSpec, + } + nodePatchJson, err := json.Marshal(nodePatch) + if err != nil { + return fmt.Errorf("error building node patch: %v", err) + } + + klog.V(2).Infof("sending patch for node %q: %q", node.Name, string(nodePatchJson)) + + _, err = kubeClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, nodePatchJson, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("error applying patch to node: %v", err) + } + + return nil +} diff --git a/pkg/controllers/nodeipam/nodeipam_controller.go b/pkg/controllers/nodeipam/nodeipam_controller.go index 33726ac209e..e1c10186bc8 100644 --- a/pkg/controllers/nodeipam/nodeipam_controller.go +++ b/pkg/controllers/nodeipam/nodeipam_controller.go @@ -18,65 +18,46 @@ package nodeipam import ( "context" - "encoding/json" "fmt" + "net" + "sync" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/ec2" - "golang.org/x/time/rate" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + + "golang.org/x/time/rate" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" + "k8s.io/cloud-provider-aws/pkg/controllers/nodeipam/config" + "k8s.io/cloud-provider-aws/pkg/controllers/nodeipam/ipam" + cidrset "k8s.io/cloud-provider-aws/pkg/controllers/nodeipam/ipam/cidrset" awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" + controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers" "k8s.io/klog/v2" ) -// workItem contains the node and an action for that node -type workItem struct { - node *v1.Node - action func(node *v1.Node) error - requeuingCount int - enqueueTime time.Time -} - -type nodePatch struct { - Spec *nodePatchSpec `json:"spec,omitempty"` - Metadata *nodePatchMetadata `json:"metadata,omitempty"` -} - -type nodePatchMetadata struct { - Labels map[string]*string `json:"labels,omitempty"` -} - -func (w workItem) String() string { - return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime) -} - const ( - maxRequeuingCount = 9 - - // The label for depicting total number of errors a work item encounter and succeed - totalErrorsWorkItemErrorMetric = "total_errors" - - // The label for depicting total time when work item gets queued to processed - workItemProcessingTimeWorkItemMetric = "work_item_processing_time" - - // The label for depicting total time when work item gets queued to dequeued - workItemDequeuingTimeWorkItemMetric = "work_item_dequeuing_time" - - // The label for depicting total number of errors a work item encounter and fail - errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted" + // The amount of time the nodecontroller polls on the list nodes endpoint. + apiserverStartupGracePeriod = 10 * time.Minute ) +// nodePollInterval is used in listing node +// This is a variable instead of a const to enable testing. +var nodePollInterval = 10 * time.Second + type Controller struct { nodeInformer coreinformers.NodeInformer kubeClient clientset.Interface @@ -90,6 +71,22 @@ type Controller struct { nodeMonitorPeriod time.Duration rateLimitEnabled bool + + // nodeLister is able to list/get nodes and is populated by the shared informer passed to controller + nodeLister corelisters.NodeLister + // cluster cidrs as passed in during controller creation + clusterCIDRs []*net.IPNet + // for each entry in clusterCIDRs we maintain a list of what is used and what is not + cidrSets []*cidrset.CidrSet + // Channel that is used to pass updating Nodes and their reserved CIDRs to the background + // This increases a throughput of CIDR assignment by not blocking on long operations. + nodeCIDRUpdateChannel chan ipam.NodeReservedCIDRs + recorder record.EventRecorder + // Keep a set of nodes that are currently being processed to avoid races in CIDR allocation + lock sync.Mutex + nodesInProcessing sets.String + cidrAllocator ipam.CIDRAllocator + ipv6CIDRAllocator ipam.IPv6CIDRAllocator } // NewNodeIpamController creates a NewNodeIpamController object @@ -98,24 +95,24 @@ func NewNodeIpamController( kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeMonitorPeriod time.Duration, - rateLimit float64, - burstLimit int) (*Controller, error) { - + nodeIpamConfig config.NodeIPAMControllerConfiguration, +) (*Controller, error) { + var err error awsCloud, ok := cloud.(*awsv1.Cloud) if !ok { - err := fmt.Errorf("nodeipam controller does not support %v provider", cloud.ProviderName()) + err = fmt.Errorf("nodeipam controller does not support %v provider", cloud.ProviderName()) return nil, err } var rateLimiter workqueue.RateLimiter var rateLimitEnabled bool - if rateLimit > 0.0 && burstLimit > 0 { - klog.Infof("Rate limit enabled on controller with rate %f and burst %d.", rateLimit, burstLimit) + if nodeIpamConfig.RateLimit > 0.0 && nodeIpamConfig.BurstLimit > 0 { + klog.Infof("Rate limit enabled on controller with rate %f and burst %d.", nodeIpamConfig.RateLimit, nodeIpamConfig.BurstLimit) // This is the workqueue.DefaultControllerRateLimiter() but in case where throttling is enabled on the controller, // the rate and burst values are set to the provided values. rateLimiter = workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(rateLimit), burstLimit)}, + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(nodeIpamConfig.RateLimit), nodeIpamConfig.BurstLimit)}, ) rateLimitEnabled = true } else { @@ -124,7 +121,6 @@ func NewNodeIpamController( rateLimitEnabled = false } - registerMetrics() nc := &Controller{ nodeInformer: nodeInformer, kubeClient: kubeClient, @@ -135,220 +131,87 @@ func NewNodeIpamController( rateLimitEnabled: rateLimitEnabled, } - // Use shared informer to listen to add/update/delete of nodes. Note that any nodes - // that exist before nodeipam controller starts will show up in the update method - nc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - node := obj.(*v1.Node) - nc.enqueueNode(node, nc.prefixNodeResource) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - node := newObj.(*v1.Node) - // Check if nodeipam is required by inspecting the labels. This check here prevents us from putting a tagged node into the - // work queue. We check this again before nodeipam the node to make sure that between when a node was put in the work queue - // and when it gets prefixed, there might be another event which put the same item in the work queue - // (since the node won't have the labels yet) and hence prevents us from making an unnecessary EC2 call. - if !nc.isPrefixNodeRequired(node) { - klog.Infof("Skip putting node %s in work queue since it was already prefixed earlier.", node.GetName()) - return - } - - nc.enqueueNode(node, nc.prefixNodeResource) - }, - }) - - return nc, nil -} - -// Run will start the controller to add write the prefix from the network interface to the node -func (nc *Controller) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - defer nc.workqueue.ShutDown() - - // Wait for the caches to be synced before starting workers - klog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, nc.nodesSynced); !ok { - klog.Errorf("failed to wait for caches to sync") - return - } - - klog.Infof("Starting the nodeipam controller") - go wait.Until(nc.work, nc.nodeMonitorPeriod, stopCh) - - <-stopCh -} - -// work is a long-running function that continuously -// call process() for each message on the workqueue -func (nc *Controller) work() { - for nc.process() { - } -} - -// process reads each message in the queue and performs either -// add prefix to kubernetes node object -func (nc *Controller) process() bool { - obj, shutdown := nc.workqueue.Get() - if shutdown { - return false - } - - klog.Infof("Starting to process %s", obj) - - err := func(obj interface{}) error { - defer nc.workqueue.Done(obj) - - workItem, ok := obj.(*workItem) - if !ok { - nc.workqueue.Forget(obj) - err := fmt.Errorf("expected workItem in workqueue but got %s", obj) - utilruntime.HandleError(err) - return nil - } - - timeTaken := time.Since(workItem.enqueueTime).Seconds() - recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken) - klog.Infof("Dequeuing latency %s", timeTaken) - - instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID() + // for IPv6 only + if !nodeIpamConfig.DualStack { + ipam.RegisterMetrics() + nc.ipv6CIDRAllocator, err = ipam.NewIPv6RangeAllocator(kubeClient, nodeInformer, awsCloud, rateLimiter, rateLimitEnabled, nodeMonitorPeriod) if err != nil { - err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err) - utilruntime.HandleError(err) - return nil + return nil, err } - klog.Infof("Instance ID of work item %s is %s", workItem, instanceID) - - if awsv1.IsFargateNode(string(instanceID)) { - klog.Infof("Skip processing the node %s since it is a Fargate node", instanceID) - nc.workqueue.Forget(obj) - return nil + } else { + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"}) + eventBroadcaster.StartStructuredLogging(0) + klog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + allocatorParams := ipam.CIDRAllocatorParams{ + ClusterCIDRs: nodeIpamConfig.ClusterCIDRs, + NodeCIDRMaskSizes: []int{int(nodeIpamConfig.NodeCIDRMaskSize)}, } - - err = workItem.action(workItem.node) - + nc = &Controller{ + kubeClient: kubeClient, + clusterCIDRs: allocatorParams.ClusterCIDRs, + cloud: awsCloud, + cidrSets: []*cidrset.CidrSet{}, + nodeLister: nodeInformer.Lister(), + nodesSynced: nodeInformer.Informer().HasSynced, + nodeCIDRUpdateChannel: make(chan ipam.NodeReservedCIDRs, ipam.CidrUpdateQueueSize), + recorder: recorder, + nodesInProcessing: sets.NewString(), + } + nodeList, err := listNodes(kubeClient) if err != nil { - if workItem.requeuingCount < maxRequeuingCount { - // Put the item back on the workqueue to handle any transient errors. - workItem.requeuingCount++ - nc.workqueue.AddRateLimited(workItem) - - recordWorkItemErrorMetrics(totalErrorsWorkItemErrorMetric, string(instanceID)) - return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount) - } - - klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error()) - recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID)) - } else { - klog.Infof("Finished processing %s", workItem) - timeTaken = time.Since(workItem.enqueueTime).Seconds() - recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken) - klog.Infof("Processing latency %s", timeTaken) + return nil, err + } + nc.cidrAllocator, err = ipam.NewCIDRRangeAllocator(kubeClient, nodeInformer, awsCloud, allocatorParams, nodeList) + if err != nil { + return nil, err } - nc.workqueue.Forget(obj) - return nil - }(obj) - - if err != nil { - klog.Errorf("Error occurred while processing %s", obj) - utilruntime.HandleError(err) } - - return true + return nc, nil } -func (nc *Controller) prefixNodeResource(node *v1.Node) error { - if node.Spec.ProviderID == "" { - klog.Infof("Node %q has empty provider ID", node.Name) - return nil - } - - // aws:///eu-central-1a/i-07577a7bcf3e576f2 - instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() - eni, err := nc.cloud.DescribeNetworkInterfaces( - &ec2.DescribeNetworkInterfacesInput{ - Filters: []*ec2.Filter{ - { - Name: ptrTo("attachment.instance-id"), - Values: []*string{ - ptrTo(string(instanceID)), - }, - }, - }, - }) - if err != nil { - return err - } - - if len(eni.Ipv6Prefixes) != 1 { - return fmt.Errorf("unexpected amount of ipv6 prefixes on interface %q: %v", *eni.NetworkInterfaceId, len(eni.Ipv6Prefixes)) - } - - ipv6Address := aws.StringValue(eni.Ipv6Prefixes[0].Ipv6Prefix) - if err := patchNodePodCIDRs(nc.kubeClient, node, ipv6Address); err != nil { - return err - } - klog.Infof("Successfully prefixed node %s with %v.", node.GetName(), ipv6Address) - return nil -} +// Run starts an asynchronous loop that monitors the status of cluster nodes. +func (nc *Controller) Run(stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics, dualStack bool) { + defer utilruntime.HandleCrash() -type nodePatchSpec struct { - PodCIDR string `json:"podCIDR,omitempty"` - PodCIDRs []string `json:"podCIDRs,omitempty"` -} + klog.Infof("Starting ipam controller") + defer klog.Infof("Shutting down ipam controller") + controllerManagerMetrics.ControllerStarted("nodeipam") + defer controllerManagerMetrics.ControllerStopped("nodeipam") -// enqueueNode takes in the object and an -// action for the object for a workitem and enqueue to the workqueue -func (nc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) { - item := &workItem{ - node: node, - action: action, - requeuingCount: 0, - enqueueTime: time.Now(), + if !cache.WaitForNamedCacheSync("node", stopCh, nc.nodesSynced) { + return } - if nc.rateLimitEnabled { - nc.workqueue.AddRateLimited(item) - klog.Infof("Added %s to the workqueue (rate-limited)", item) + if !dualStack { + go nc.ipv6CIDRAllocator.Run(stopCh) } else { - nc.workqueue.Add(item) - klog.Infof("Added %s to the workqueue (without any rate-limit)", item) - } -} - -// patchNodePodCIDRs patches the node podCIDR to the specified value. -func patchNodePodCIDRs(kubeClient clientset.Interface, node *corev1.Node, cidr string) error { - klog.Infof("assigning cidr %q to node %q", cidr, node.ObjectMeta.Name) - nodePatchSpec := &nodePatchSpec{ - PodCIDR: cidr, - PodCIDRs: []string{cidr}, - } - nodePatch := &nodePatch{ - Spec: nodePatchSpec, + go nc.cidrAllocator.Run(stopCh) } - nodePatchJson, err := json.Marshal(nodePatch) - if err != nil { - return fmt.Errorf("error building node patch: %v", err) - } - - klog.V(2).Infof("sending patch for node %q: %q", node.Name, string(nodePatchJson)) - _, err = kubeClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, nodePatchJson, metav1.PatchOptions{}) - if err != nil { - return fmt.Errorf("error applying patch to node: %v", err) - } - - return nil + <-stopCh } -func (nc *Controller) isPrefixNodeRequired(node *v1.Node) bool { - if node.Spec.PodCIDR == "" && node.Spec.PodCIDRs == nil { - return true +func listNodes(kubeClient clientset.Interface) (*v1.NodeList, error) { + var nodeList *v1.NodeList + // We must poll because apiserver might not be up. This error causes + // controller manager to restart. + if pollErr := wait.Poll(nodePollInterval, apiserverStartupGracePeriod, func() (bool, error) { + var err error + nodeList, err = kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + FieldSelector: fields.Everything().String(), + LabelSelector: labels.Everything().String(), + }) + if err != nil { + klog.Errorf("Failed to list all nodes: %v", err) + return false, nil + } + return true, nil + }); pollErr != nil { + return nil, fmt.Errorf("failed to list all nodes in %v, cannot proceed without updating CIDR map", + apiserverStartupGracePeriod) } - return false -} - -// ptrTo returns a pointer to a copy of any value. -func ptrTo[T any](v T) *T { - return &v + return nodeList, nil } diff --git a/pkg/controllers/nodeipam/nodeipam_controller_wrapper.go b/pkg/controllers/nodeipam/nodeipam_controller_wrapper.go index 100c5e27d0d..91d33473067 100644 --- a/pkg/controllers/nodeipam/nodeipam_controller_wrapper.go +++ b/pkg/controllers/nodeipam/nodeipam_controller_wrapper.go @@ -18,6 +18,9 @@ package nodeipam import ( "context" + "fmt" + "net" + "strings" cloudprovider "k8s.io/cloud-provider" "k8s.io/cloud-provider/app" @@ -25,7 +28,9 @@ import ( genericcontrollermanager "k8s.io/controller-manager/app" "k8s.io/controller-manager/controller" "k8s.io/klog/v2" + netutils "k8s.io/utils/net" + "k8s.io/cloud-provider-aws/pkg/controllers/nodeipam/config" "k8s.io/cloud-provider-aws/pkg/controllers/options" ) @@ -40,36 +45,70 @@ const ( // ControllerWrapper is the wrapper for the nodeipam controller type ControllerWrapper struct { Options options.NodeIpamControllerOptions + Config config.NodeIPAMControllerConfiguration } // StartNodeIpamControllerWrapper is used to take cloud config as input and start the nodeipam controller func (nc *ControllerWrapper) StartNodeIpamControllerWrapper(initContext app.ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) app.InitFunc { return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) { - return nc.startNodeIpamController(ctx, initContext, completedConfig, cloud) + return nc.startNodeIpamController(ctx, initContext, completedConfig, controllerContext, cloud) } } -func (nc *ControllerWrapper) startNodeIpamController(ctx context.Context, initContext app.ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) { +func (nc *ControllerWrapper) startNodeIpamController(ctx context.Context, initContext app.ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, controllerContext genericcontrollermanager.ControllerContext, cloud cloudprovider.Interface) (controller.Interface, bool, error) { err := nc.Options.Validate() if err != nil { klog.Fatalf("NodeIpam controller inputs are not properly set: %v", err) } + nc.Config.ClusterCIDRs, _, err = processCIDRs(completedConfig.ComponentConfig.KubeCloudShared.ClusterCIDR) + if err != nil { + return nil, false, err + } + nc.Options.ApplyTo(&nc.Config) + + klog.Infof("Cluster CIDR: %s", nc.Config.ClusterCIDRs[0].String()) + klog.Infof("Running in dualstack mode: %t", nc.Config.DualStack) + klog.Infof("Node CIDR mask size: %v", nc.Config.NodeCIDRMaskSize) + + // failure: more than cidrs is not allowed even with dual stack + if len(nc.Config.ClusterCIDRs) > 1 { + return nil, false, fmt.Errorf("len of clusters is:%v > more than 1 is not allowed for the nodeipam controller", len(nc.Config.ClusterCIDRs)) + } + // Start the Controller - nodeipamcontroller, err := NewNodeIpamController( + nodeIpamController, err := NewNodeIpamController( completedConfig.SharedInformers.Core().V1().Nodes(), completedConfig.ClientBuilder.ClientOrDie(initContext.ClientName), cloud, completedConfig.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, - nc.Options.RateLimit, - nc.Options.BurstLimit) + nc.Config) if err != nil { klog.Warningf("failed to start nodeipam controller: %s", err) return nil, false, nil } - go nodeipamcontroller.Run(ctx.Done()) + go nodeIpamController.Run(controllerContext.Stop, controllerContext.ControllerManagerMetrics, nc.Config.DualStack) return nil, true, nil } + +// processCIDRs is a helper function that works on a comma separated cidrs and returns +// a list of typed cidrs +// a flag if cidrs represents a dual stack +// error if failed to parse any of the cidrs +func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) { + cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",") + + cidrs, err := netutils.ParseCIDRs(cidrsSplit) + if err != nil { + return nil, false, err + } + + // if cidrs has an error then the previous call will fail + // safe to ignore error checking on next call + dualstack, _ := netutils.IsDualStackCIDRs(cidrs) + + return cidrs, dualstack, nil +} diff --git a/pkg/controllers/options/nodeipam_controller.go b/pkg/controllers/options/nodeipam_controller.go index 8ac5f4fe14e..17cbed8ee3c 100644 --- a/pkg/controllers/options/nodeipam_controller.go +++ b/pkg/controllers/options/nodeipam_controller.go @@ -20,6 +20,13 @@ import ( "fmt" "github.com/spf13/pflag" + "k8s.io/cloud-provider-aws/pkg/controllers/nodeipam/config" +) + +const ( + + // DefaultNodeMaskCIDRIPv4 is default mask size for IPv4 node cidr + DefaultNodeMaskCIDR = int32(24) ) // NodeIpamControllerOptions contains the inputs that can @@ -27,6 +34,10 @@ import ( type NodeIpamControllerOptions struct { RateLimit float64 BurstLimit int + DualStack bool + // NodeCIDRMaskSize is the mask size for node cidr in single-stack cluster. + // This can be used only with single stack clusters and is incompatible with dual stack clusters. + NodeCIDRMaskSize int32 } // AddFlags add the additional flags for the controller @@ -35,6 +46,8 @@ func (o *NodeIpamControllerOptions) AddFlags(fs *pflag.FlagSet) { "Steady-state rate limit (per sec) at which the controller processes items in its queue. A value of zero (default) disables rate limiting.") fs.IntVar(&o.BurstLimit, "nodeipam-controller-burst-limit", o.BurstLimit, "Burst limit at which the controller processes items in its queue. A value of zero (default) disables rate limiting.") + fs.BoolVar(&o.DualStack, "dualstack", o.DualStack, "IP mode in which the controller runs, can be either dualstack or IPv6. A value of false (default) enables IPv6 only mode.") + fs.Int32Var(&o.NodeCIDRMaskSize, "node-cidr-mask-size", o.NodeCIDRMaskSize, "Mask size for node cidr in cluster. Default is 24 for IPv4") } // Validate checks for errors from user input @@ -50,3 +63,19 @@ func (o *NodeIpamControllerOptions) Validate() error { return nil } + +// ApplyTo fills up NodeIpamController config with options. +func (o *NodeIpamControllerOptions) ApplyTo(cfg *config.NodeIPAMControllerConfiguration) error { + if o == nil { + return nil + } + + cfg.DualStack = o.DualStack + if o.NodeCIDRMaskSize == 0 { + cfg.NodeCIDRMaskSize = DefaultNodeMaskCIDR + } else { + cfg.NodeCIDRMaskSize = o.NodeCIDRMaskSize + + } + return nil +} diff --git a/pkg/controllers/options/tagging_controller.go b/pkg/controllers/options/tagging_controller.go index c606a8589a1..7ce892ddc6c 100644 --- a/pkg/controllers/options/tagging_controller.go +++ b/pkg/controllers/options/tagging_controller.go @@ -15,6 +15,7 @@ package options import ( "fmt" + "github.com/spf13/pflag" ) diff --git a/pkg/util/controller_utils.go b/pkg/util/controller_utils.go new file mode 100644 index 00000000000..4890c76fe89 --- /dev/null +++ b/pkg/util/controller_utils.go @@ -0,0 +1,103 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" +) + +// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam) +func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { + ref := &v1.ObjectReference{ + APIVersion: "v1", + Kind: "Node", + Name: node.Name, + UID: node.UID, + Namespace: "", + } + klog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name) + // TODO: This requires a transaction, either both node status is updated + // and event is recorded or neither should happen, see issue #6055. + recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) +} + +// CreateAddNodeHandler creates an add node handler. +func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { + return func(originalObj interface{}) { + node := originalObj.(*v1.Node).DeepCopy() + if err := f(node); err != nil { + utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err)) + } + } +} + +// CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam) +func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { + return func(origOldObj, origNewObj interface{}) { + node := origNewObj.(*v1.Node).DeepCopy() + prevNode := origOldObj.(*v1.Node).DeepCopy() + + if err := f(prevNode, node); err != nil { + utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err)) + } + } +} + +// CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam) +func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { + return func(originalObj interface{}) { + originalNode, isNode := originalObj.(*v1.Node) + // We can get DeletedFinalStateUnknown instead of *v1.Node here and + // we need to handle that correctly. #34692 + if !isNode { + deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Received unexpected object: %v", originalObj) + return + } + originalNode, ok = deletedState.Obj.(*v1.Node) + if !ok { + klog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + return + } + } + node := originalNode.DeepCopy() + if err := f(node); err != nil { + utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err)) + } + } +} + +// GetNodeCondition extracts the provided condition from the given status and returns that. +// Returns nil and -1 if the condition is not present, and the index of the located condition. +func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) { + if status == nil { + return -1, nil + } + for i := range status.Conditions { + if status.Conditions[i].Type == conditionType { + return i, &status.Conditions[i] + } + } + return -1, nil +}