diff --git a/pilot/pkg/serviceregistry/kube/controller/pod.go b/pilot/pkg/serviceregistry/kube/controller/pod.go index 2ef1aa3719f2..9dfb75244e50 100644 --- a/pilot/pkg/serviceregistry/kube/controller/pod.go +++ b/pilot/pkg/serviceregistry/kube/controller/pod.go @@ -36,9 +36,9 @@ type PodCache struct { // this allows us to retrieve the latest status by pod IP. // This should only contain RUNNING or PENDING pods with an allocated IP. podsByIP map[string]sets.Set[types.NamespacedName] - // IPByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the + // ipByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the // pod cache if a pod changes IP. - IPByPods map[types.NamespacedName]string + ipByPods map[types.NamespacedName]string // needResync is map of IP to endpoint namespace/name. This is used to requeue endpoint // events when pod event comes. This typically happens when pod is not available @@ -54,7 +54,7 @@ func newPodCache(c *Controller, pods kclient.Client[*v1.Pod], queueEndpointEvent pods: pods, c: c, podsByIP: make(map[string]sets.Set[types.NamespacedName]), - IPByPods: make(map[types.NamespacedName]string), + ipByPods: make(map[types.NamespacedName]string), needResync: make(map[string]sets.Set[types.NamespacedName]), queueEndpointEvent: queueEndpointEvent, } @@ -151,7 +151,14 @@ func (pc *PodCache) onEvent(_, pod *v1.Pod, ev model.Event) error { // PodIP will be empty when pod is just created, but before the IP is assigned // via UpdateStatus. if len(ip) == 0 { - return nil + // However, in the case of an Eviction, the event that marks the pod as Failed may *also* have removed the IP. + // If the pod *used to* have an IP, then we need to actually delete it. + ip = pc.getIPByPod(config.NamespacedName(pod)) + if len(ip) == 0 { + log.Debugf("Pod %s has no IP", config.NamespacedName(pod).String()) + return nil + } + log.Debugf("Pod %s has no IP, but was in the cache (%s), continue so we can delete it", config.NamespacedName(pod).String(), ip) } key := config.NamespacedName(pod) @@ -181,18 +188,18 @@ func (pc *PodCache) onEvent(_, pod *v1.Pod, ev model.Event) error { return nil } } - pc.notifyWorkloadHandlers(pod, ev) + pc.notifyWorkloadHandlers(pod, ev, ip) return nil } // notifyWorkloadHandlers fire workloadInstance handlers for pod -func (pc *PodCache) notifyWorkloadHandlers(pod *v1.Pod, ev model.Event) { +func (pc *PodCache) notifyWorkloadHandlers(pod *v1.Pod, ev model.Event, ip string) { // if no workload handler registered, skip building WorkloadInstance if len(pc.c.handlers.GetWorkloadHandlers()) == 0 { return } // fire instance handles for workload - ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(pod.Status.PodIP, 0, "", model.AlwaysDiscoverable, model.Healthy) + ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(ip, 0, "", model.AlwaysDiscoverable, model.Healthy) workloadInstance := &model.WorkloadInstance{ Name: pod.Name, Namespace: pod.Namespace, @@ -225,7 +232,7 @@ func (pc *PodCache) deleteIP(ip string, podKey types.NamespacedName) bool { defer pc.Unlock() if pc.podsByIP[ip].Contains(podKey) { sets.DeleteCleanupLast(pc.podsByIP, ip, podKey) - delete(pc.IPByPods, podKey) + delete(pc.ipByPods, podKey) return true } return false @@ -238,12 +245,12 @@ func (pc *PodCache) update(ip string, key types.NamespacedName) { pc.Unlock() return } - if current, f := pc.IPByPods[key]; f { + if current, f := pc.ipByPods[key]; f { // The pod already exists, but with another IP Address. We need to clean up that sets.DeleteCleanupLast(pc.podsByIP, current, key) } sets.InsertOrNew(pc.podsByIP, ip, key) - pc.IPByPods[key] = ip + pc.ipByPods[key] = ip if endpointsToUpdate, f := pc.needResync[ip]; f { delete(pc.needResync, ip) @@ -286,6 +293,13 @@ func (pc *PodCache) getPodKeys(addr string) []types.NamespacedName { return pc.podsByIP[addr].UnsortedList() } +// getIPByPod returns the pod IP or empty string if pod not found. +func (pc *PodCache) getIPByPod(key types.NamespacedName) string { + pc.RLock() + defer pc.RUnlock() + return pc.ipByPods[key] +} + // getPodByIp returns the pod or nil if pod not found or an error occurred func (pc *PodCache) getPodsByIP(addr string) []*v1.Pod { keys := pc.getPodKeys(addr) diff --git a/pilot/pkg/serviceregistry/serviceregistry_test.go b/pilot/pkg/serviceregistry/serviceregistry_test.go index 580cb11f1739..584727dc68f9 100644 --- a/pilot/pkg/serviceregistry/serviceregistry_test.go +++ b/pilot/pkg/serviceregistry/serviceregistry_test.go @@ -784,6 +784,86 @@ func TestWorkloadInstances(t *testing.T) { expectServiceEndpoints(t, fx, expectedSvc, 80, instances) }) + t.Run("ServiceEntry selects Pod that is Failed without IP", func(t *testing.T) { + store, kube, fx := setupTest(t) + makeIstioObject(t, store, serviceEntry) + makePod(t, kube, pod) + // Copy the pod since other tests expect it to have an IP. + p2 := pod.DeepCopy() + instances := []EndpointResponse{{ + Address: p2.Status.PodIP, + Port: 80, + }} + expectServiceEndpoints(t, fx, expectedSvc, 80, instances) + + // Failed pods should have their endpoints removed from the registry, despite not having an IP. + p2.Status.PodIP = "" + p2.Status.PodIPs = nil + p2.Status.Phase = v1.PodFailed + _, err := kube.CoreV1().Pods(p2.Namespace).UpdateStatus(context.TODO(), p2, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + expectServiceEndpoints(t, fx, expectedSvc, 80, []EndpointResponse{}) + }) + + t.Run("ServiceEntry selects Pod that is Failed with an IP", func(t *testing.T) { + store, kube, fx := setupTest(t) + makeIstioObject(t, store, serviceEntry) + makePod(t, kube, pod) + p2 := pod.DeepCopy() + instances := []EndpointResponse{{ + Address: p2.Status.PodIP, + Port: 80, + }} + expectServiceEndpoints(t, fx, expectedSvc, 80, instances) + + // Failed pods should have their endpoints removed from the registry + p2.Status.Phase = v1.PodFailed + _, err := kube.CoreV1().Pods(p2.Namespace).UpdateStatus(context.TODO(), p2, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + expectServiceEndpoints(t, fx, expectedSvc, 80, []EndpointResponse{}) + + // Removing the IP should be a no-op + p2.Status.PodIP = "" + p2.Status.PodIPs = nil + _, err = kube.CoreV1().Pods(p2.Namespace).UpdateStatus(context.TODO(), p2, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + expectServiceEndpoints(t, fx, expectedSvc, 80, []EndpointResponse{}) + }) + + t.Run("ServiceEntry selects Pod with IP removed", func(t *testing.T) { + store, kube, fx := setupTest(t) + makeIstioObject(t, store, serviceEntry) + makePod(t, kube, pod) + p2 := pod.DeepCopy() + instances := []EndpointResponse{{ + Address: p2.Status.PodIP, + Port: 80, + }} + expectServiceEndpoints(t, fx, expectedSvc, 80, instances) + + // Pods without an IP can't be ready. + p2.Status.PodIP = "" + p2.Status.PodIPs = nil + _, err := kube.CoreV1().Pods(p2.Namespace).UpdateStatus(context.TODO(), p2, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + expectServiceEndpoints(t, fx, expectedSvc, 80, []EndpointResponse{}) + + // Failing the pod should be a no-op + p2.Status.Phase = v1.PodFailed + _, err = kube.CoreV1().Pods(p2.Namespace).UpdateStatus(context.TODO(), p2, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + expectServiceEndpoints(t, fx, expectedSvc, 80, []EndpointResponse{}) + }) t.Run("ServiceEntry selects Pod with targetPort number", func(t *testing.T) { store, kube, fx := setupTest(t) makeIstioObject(t, store, config.Config{ diff --git a/releasenotes/notes/eviction-bug.yaml b/releasenotes/notes/eviction-bug.yaml new file mode 100644 index 000000000000..0fb791ec8d75 --- /dev/null +++ b/releasenotes/notes/eviction-bug.yaml @@ -0,0 +1,8 @@ +apiVersion: release-notes/v2 +kind: bug-fix +area: traffic-management +issue: +- 54997 +releaseNotes: +- | + **Fixed** an issue where ServiceEntry endpoints are leaked when a pod is evicted. \ No newline at end of file