From 5915ef5a184f1b632f87b11f45e2170f7469b2db Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Tue, 19 Mar 2024 10:24:02 -0700 Subject: [PATCH] Don't send endpoint profile updates from Server updates when opaqueness doesn't change (#12013) When the destination controller receives an update for a Server resource, we recompute opaqueness ports for all pods. This results in a large number of updates to all endpoint profile watches, even if the opaqueness doesn't change. In cases where there are many Server resources, this can result in a large number of updates being sent to the endpoint profile translator and overflowing the endpoint profile translator update queue. This is especially likely to happen during an informer resync, since this will result in an informer callback for every Server in the cluster. We refactor the workload watcher to not send these updates if the opaqueness has not changed. This, seemingly simple, change in behavior requires a large code change because: * the current opaqueness state is not stored on workload publishers and must be added so that we can determine if the opaqueness has changed * storing the opaqueness in addition to the other state we're storing (pod, ip, port, etc.) means that we are not storing all of the data represented by the Address struct * workload watcher uses a `createAddress` func to dynamically create an Address from the state it stores * now that we are storing the Address as state, creating Addresses dynamically is no longer necessary and we can operate on the Address state directly * this makes the workload watcher more similar to other watchers and follow a common pattern * it also fixes some minor correctness issues: * pods that did not have the ready status condition were being considered when they should not have been * updates to ExternalWorkload labels were not being considered Signed-off-by: Alex Leong --- .../endpoint_profile_translator.go | 3 +- controller/api/destination/test_util.go | 26 +- .../destination/watcher/endpoints_watcher.go | 18 +- .../destination/watcher/workload_watcher.go | 272 ++++++------------ 4 files changed, 130 insertions(+), 189 deletions(-) diff --git a/controller/api/destination/endpoint_profile_translator.go b/controller/api/destination/endpoint_profile_translator.go index 460a8a059b961..48da6a84fb2eb 100644 --- a/controller/api/destination/endpoint_profile_translator.go +++ b/controller/api/destination/endpoint_profile_translator.go @@ -128,10 +128,11 @@ func (ept *endpointProfileTranslator) update(address *watcher.Address) { return } + _, opaqueProtocol := opaquePorts[address.Port] profile := &pb.DestinationProfile{ RetryBudget: defaultRetryBudget(), Endpoint: endpoint, - OpaqueProtocol: address.OpaqueProtocol, + OpaqueProtocol: opaqueProtocol || address.OpaqueProtocol, } if proto.Equal(profile, ept.current) { ept.log.Debugf("Ignoring redundant profile update: %+v", profile) diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 0f7286e3c2397..5a250fff3d96b 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -66,6 +66,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.12 podIPs: - ip: 172.17.0.12 @@ -146,6 +149,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.13 podIPs: - ip: 172.17.0.13` @@ -193,6 +199,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.14 podIPs: - ip: 172.17.0.14 @@ -258,6 +267,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.15 podIPs: - ip: 172.17.0.15 @@ -311,6 +323,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.13.15 podIPs: - ip: 172.17.13.15`, @@ -358,6 +373,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.16 podIPs: - ip: 172.17.0.16 @@ -408,6 +426,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" hostIP: 192.168.1.20 podIP: 172.17.0.17 podIPs: @@ -466,6 +487,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.55.1 podIPs: - ip: 172.17.55.1 @@ -528,7 +552,7 @@ spec: port: 4143 status: conditions: - ready: true`, + - ready: true`, ` apiVersion: workload.linkerd.io/v1beta1 kind: ExternalWorkload diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index f7aa6fbb1eccb..4b9be3ee6a60d 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -931,7 +931,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A pp.log.Errorf("Unable to create new address:%v", err) continue } - err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort) + err = SetToServerProtocol(pp.k8sAPI, &address) if err != nil { pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) continue @@ -955,7 +955,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A continue } - err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address, resolvedPort) + err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address) if err != nil { pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) continue @@ -1067,7 +1067,7 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre pp.log.Errorf("Unable to create new address:%v", err) continue } - err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort) + err = SetToServerProtocol(pp.k8sAPI, &address) if err != nil { pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) continue @@ -1487,7 +1487,7 @@ func isValidSlice(es *discovery.EndpointSlice) bool { // SetToServerProtocol sets the address's OpaqueProtocol field based off any // Servers that select it and override the expected protocol. -func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error { +func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error { if address.Pod == nil { return fmt.Errorf("endpoint not backed by Pod: %s:%d", address.IP, address.Port) } @@ -1504,13 +1504,13 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error { var portMatch bool switch server.Spec.Port.Type { case intstr.Int: - if server.Spec.Port.IntVal == int32(port) { + if server.Spec.Port.IntVal == int32(address.Port) { portMatch = true } case intstr.String: for _, c := range address.Pod.Spec.Containers { for _, p := range c.Ports { - if (p.ContainerPort == int32(port) || p.HostPort == int32(port)) && + if (p.ContainerPort == int32(address.Port) || p.HostPort == int32(address.Port)) && p.Name == server.Spec.Port.StrVal { portMatch = true } @@ -1530,7 +1530,7 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error { // setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any // Servers that select it and override the expected protocol for ExternalWorkloads. -func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port Port) error { +func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error { if address.ExternalWorkload == nil { return fmt.Errorf("endpoint not backed by ExternalWorkload: %s:%d", address.IP, address.Port) } @@ -1547,12 +1547,12 @@ func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port var portMatch bool switch server.Spec.Port.Type { case intstr.Int: - if server.Spec.Port.IntVal == int32(port) { + if server.Spec.Port.IntVal == int32(address.Port) { portMatch = true } case intstr.String: for _, p := range address.ExternalWorkload.Spec.Ports { - if p.Port == int32(port) && p.Name == server.Spec.Port.StrVal { + if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal { portMatch = true } diff --git a/controller/api/destination/watcher/workload_watcher.go b/controller/api/destination/watcher/workload_watcher.go index 2e1fd65d6d2d4..9e562b0d921c3 100644 --- a/controller/api/destination/watcher/workload_watcher.go +++ b/controller/api/destination/watcher/workload_watcher.go @@ -9,7 +9,6 @@ import ( "sync" "time" - externalworkload "github.com/linkerd/linkerd2/controller/api/destination/external-workload" ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1" "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2" "github.com/linkerd/linkerd2/controller/k8s" @@ -40,17 +39,15 @@ type ( mu sync.RWMutex } - // workloadPublisher represents an ip:port along with the backing pod - // or externalworkload (if any). It keeps a list of listeners to be notified - // whenever the workload or the associated opaque protocol config changes. + // workloadPublisher represents an address including ip:port, the backing + // pod or externalworkload (if any), and if the protocol is opaque. It keeps + // a list of listeners to be notified whenever the workload or the + // associated opaque protocol config changes. workloadPublisher struct { defaultOpaquePorts map[uint32]struct{} k8sAPI *k8s.API metadataAPI *k8s.MetadataAPI - ip string - port Port - pod *corev1.Pod - externalWorkload *ext.ExternalWorkload + addr Address listeners []WorkloadUpdateListener metrics metrics log *logging.Entry @@ -120,24 +117,16 @@ func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, po } else { ww.log.Debugf("Establishing watch on workload %s:%d", ip, port) } - pp, err := ww.getOrNewWorkloadPublisher(service, hostname, ip, port) + wp, err := ww.getOrNewWorkloadPublisher(service, hostname, ip, port) if err != nil { return "", err } - pp.subscribe(listener) - - address, err := pp.createAddress() - if err != nil { + if err = wp.subscribe(listener); err != nil { return "", err } - if err = listener.Update(&address); err != nil { - return "", fmt.Errorf("failed to send initial update: %w", err) - } - pp.metrics.incUpdates() - - return pp.ip, nil + return wp.addr.IP, nil } // Subscribe stops notifying the listener on chages on any pod backing the @@ -155,7 +144,7 @@ func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUp wp.unsubscribe(listener) if len(wp.listeners) == 0 { - delete(ww.publishers, IPPort{wp.ip, wp.port}) + delete(ww.publishers, IPPort{wp.addr.IP, wp.addr.Port}) } } @@ -341,7 +330,7 @@ func (ww *WorkloadWatcher) addOrDeleteServer(obj interface{}) { ww.updateServers(server) } -// updateServer triggers an Update() call to the listeners of the workloadPublishers +// updateServers triggers an Update() call to the listeners of the workloadPublishers // whose pod matches the any of the Servers' podSelector or whose // externalworkload matches any of the Servers' externalworkload selection. This // function is an event handler so it cannot block. @@ -351,23 +340,38 @@ func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) { for _, wp := range ww.publishers { var opaquePorts map[uint32]struct{} - if wp.pod != nil { - if !ww.isPodSelectedByAny(wp.pod, servers...) { + if wp.addr.Pod != nil { + if !ww.isPodSelectedByAny(wp.addr.Pod, servers...) { continue } - opaquePorts = GetAnnotatedOpaquePorts(wp.pod, ww.defaultOpaquePorts) - } else if wp.externalWorkload != nil { - if !ww.isExternalWorkloadSelectedByAny(wp.externalWorkload, servers...) { + opaquePorts = GetAnnotatedOpaquePorts(wp.addr.Pod, ww.defaultOpaquePorts) + } else if wp.addr.ExternalWorkload != nil { + if !ww.isExternalWorkloadSelectedByAny(wp.addr.ExternalWorkload, servers...) { continue } - opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, ww.defaultOpaquePorts) + opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.addr.ExternalWorkload, ww.defaultOpaquePorts) } else { continue } - _, isOpaque := opaquePorts[wp.port] + _, annotatedOpaque := opaquePorts[wp.addr.Port] // if port is annotated to be always opaque we can disregard Server updates - if isOpaque { + if annotatedOpaque { + continue + } + + opaque := wp.addr.OpaqueProtocol + name := net.JoinHostPort(wp.addr.IP, fmt.Sprintf("%d", wp.addr.Port)) + if wp.addr.Pod != nil { + name = wp.addr.Pod.GetName() + } else if wp.addr.ExternalWorkload != nil { + name = wp.addr.ExternalWorkload.GetName() + } + if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil { + wp.log.Errorf("Error computing opaque protocol for %s: %q", name, err) + } + if wp.addr.OpaqueProtocol == opaque { + // OpaqueProtocol has not changed. No need to update the listeners. continue } @@ -375,25 +379,13 @@ func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) { wp.mu.RLock() defer wp.mu.RUnlock() - updated := false for _, listener := range wp.listeners { - // the Server in question doesn't carry information about other - // Servers that might target this workloadPublisher; createAddress() - // queries all the relevant Servers to determine the full state - addr, err := wp.createAddress() - if err != nil { - ww.log.Errorf("Error creating address for workload: %s", err) - continue - } - if err = listener.Update(&addr); err != nil { + if err := listener.Update(&wp.addr); err != nil { ww.log.Warnf("Error sending update to listener: %s", err) continue } - updated = true - } - if updated { - wp.metrics.incUpdates() } + wp.metrics.incUpdates() }(wp) } } @@ -467,10 +459,10 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam defaultOpaquePorts: ww.defaultOpaquePorts, k8sAPI: ww.k8sAPI, metadataAPI: ww.metadataAPI, - ip: ip, - port: port, - pod: pod, - externalWorkload: externalWorkload, + addr: Address{ + IP: ip, + Port: port, + }, metrics: ipPortVecs.newMetrics(prometheus.Labels{ "ip": ip, "port": strconv.FormatUint(uint64(port), 10), @@ -481,6 +473,12 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam "port": port, }), } + if pod != nil { + wp.updatePod(pod) + } + if externalWorkload != nil { + wp.updateExternalWorkload(externalWorkload) + } ww.publishers[ipPort] = wp } return wp, nil @@ -626,12 +624,18 @@ func (ww *WorkloadWatcher) getEndpointByHostname(hostname string, svcID *Service return nil, status.Errorf(codes.NotFound, "no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname) } -func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) { +func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) error { wp.mu.Lock() defer wp.mu.Unlock() wp.listeners = append(wp.listeners, listener) wp.metrics.setSubscribers(len(wp.listeners)) + + if err := listener.Update(&wp.addr); err != nil { + return fmt.Errorf("failed to send initial update: %w", err) + } + wp.metrics.incUpdates() + return nil } func (wp *workloadPublisher) unsubscribe(listener WorkloadUpdateListener) { @@ -660,7 +664,7 @@ func (wp *workloadPublisher) updatePod(pod *corev1.Pod) { defer wp.mu.Unlock() // pod wasn't ready or there was no backing pod - check if passed pod is ready - if wp.pod == nil { + if wp.addr.Pod == nil { if pod == nil { wp.log.Trace("Pod deletion event already consumed - ignore") return @@ -672,46 +676,50 @@ func (wp *workloadPublisher) updatePod(pod *corev1.Pod) { } wp.log.Debugf("Pod %s.%s became ready", pod.Name, pod.Namespace) - wp.pod = pod - updated := false - for _, l := range wp.listeners { - addr, err := wp.createAddress() + wp.addr.Pod = pod + + // Fill in ownership. + if wp.addr.Pod != nil { + ownerKind, ownerName, err := wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.addr.Pod, true) if err != nil { - wp.log.Errorf("Error creating address for pod: %s", err) - continue + wp.log.Errorf("Error getting pod owner for pod %s: %q", wp.addr.Pod.GetName(), err) + } else { + wp.addr.OwnerKind = ownerKind + wp.addr.OwnerName = ownerName } - if err = l.Update(&addr); err != nil { + } + + // Compute opaque protocol. + if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil { + wp.log.Errorf("Error computing opaque protocol for pod %s: %q", wp.addr.Pod.GetName(), err) + } + + for _, l := range wp.listeners { + if err := l.Update(&wp.addr); err != nil { wp.log.Warnf("Error sending update to listener: %s", err) continue } - updated = true - } - if updated { - wp.metrics.incUpdates() } + wp.metrics.incUpdates() + return } // backing pod becoming unready or getting deleted if pod == nil || !isRunningAndReady(pod) { - wp.log.Debugf("Pod %s.%s deleted or it became unready - remove", wp.pod.Name, wp.pod.Namespace) - wp.pod = nil - updated := false + wp.log.Debugf("Pod %s.%s deleted or it became unready - remove", wp.addr.Pod.Name, wp.addr.Pod.Namespace) + wp.addr.Pod = nil + wp.addr.OwnerKind = "" + wp.addr.OwnerName = "" + wp.addr.OpaqueProtocol = false for _, l := range wp.listeners { - addr, err := wp.createAddress() - if err != nil { - wp.log.Errorf("Error creating address for pod: %s", err) - continue - } - if err = l.Update(&addr); err != nil { + if err := l.Update(&wp.addr); err != nil { wp.log.Warnf("Error sending update to listener: %s", err) continue } - updated = true - } - if updated { - wp.metrics.incUpdates() } + wp.metrics.incUpdates() + return } @@ -726,118 +734,26 @@ func (wp *workloadPublisher) updateExternalWorkload(externalWorkload *ext.Extern wp.mu.Lock() defer wp.mu.Unlock() - // externalWorkload wasn't ready or there was no backing externalWorkload. - // check if passed externalWorkload is ready - if wp.externalWorkload == nil { - if externalWorkload == nil { - wp.log.Trace("ExternalWorkload deletion event already consumed - ignore") - return - } - - if !externalworkload.IsEwReady(externalWorkload) { - wp.log.Tracef("ExternalWorkload %s.%s not ready - ignore", externalWorkload.Name, externalWorkload.Namespace) - return - } + wp.addr.ExternalWorkload = externalWorkload - wp.log.Debugf("ExternalWorkload %s.%s became ready", externalWorkload.Name, externalWorkload.Namespace) - wp.externalWorkload = externalWorkload - updated := false - for _, l := range wp.listeners { - addr, err := wp.createAddress() - if err != nil { - wp.log.Errorf("Error creating address for externalWorkload: %s", err) - continue - } - if err = l.Update(&addr); err != nil { - wp.log.Warnf("Error sending update to listener: %s", err) - continue - } - updated = true - } - if updated { - wp.metrics.incUpdates() - } - return - } - - // backing pod becoming unready or getting deleted - if externalWorkload == nil || !externalworkload.IsEwReady(externalWorkload) { - wp.log.Debugf("ExternalWorkload %s.%s deleted or it became unready - remove", wp.externalWorkload.Name, wp.externalWorkload.Namespace) - wp.externalWorkload = nil - updated := false - for _, l := range wp.listeners { - addr, err := wp.createAddress() - if err != nil { - wp.log.Errorf("Error creating address for pod: %s", err) - continue - } - if err = l.Update(&addr); err != nil { - wp.log.Warnf("Error sending update to listener: %s", err) - continue - } - updated = true - } - if updated { - wp.metrics.incUpdates() - } - return - } - - wp.log.Tracef("Ignored event on externalWorkload %s.%s", externalWorkload.Name, externalWorkload.Namespace) -} - -// createAddress returns an Address instance for the given ip, port and workload. It -// completes the ownership and opaque protocol information -func (wp *workloadPublisher) createAddress() (Address, error) { - var ownerKind, ownerName string - var err error - if wp.pod != nil { - ownerKind, ownerName, err = wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.pod, true) - if err != nil { - return Address{}, err - } - } else if wp.externalWorkload != nil { - if len(wp.externalWorkload.GetOwnerReferences()) == 1 { - ownerKind = wp.externalWorkload.GetOwnerReferences()[0].Kind - ownerName = wp.externalWorkload.GetOwnerReferences()[0].Name - } + // Fill in ownership. + if wp.addr.ExternalWorkload != nil && len(wp.addr.ExternalWorkload.GetOwnerReferences()) == 1 { + wp.addr.OwnerKind = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Kind + wp.addr.OwnerName = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Name } - address := Address{ - IP: wp.ip, - Port: wp.port, - Pod: wp.pod, - ExternalWorkload: wp.externalWorkload, - OwnerName: ownerName, - OwnerKind: ownerKind, + // Compute opaque protocol. + if err := SetToServerProtocolExternalWorkload(wp.k8sAPI, &wp.addr); err != nil { + wp.log.Errorf("Error computing opaque protocol for externalworkload %s: %q", wp.addr.ExternalWorkload.GetName(), err) } - // Override opaqueProtocol if the endpoint's port is annotated as opaque - if wp.pod != nil { - opaquePorts := GetAnnotatedOpaquePorts(wp.pod, wp.defaultOpaquePorts) - if _, ok := opaquePorts[wp.port]; ok { - address.OpaqueProtocol = true - } else { - if err := SetToServerProtocol(wp.k8sAPI, &address, wp.port); err != nil { - return Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %w", err) - } - } - } else if wp.externalWorkload != nil { - opaquePorts := GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, wp.defaultOpaquePorts) - if _, ok := opaquePorts[wp.port]; ok { - address.OpaqueProtocol = true - } else { - if err := SetToServerProtocolExternalWorkload(wp.k8sAPI, &address, wp.port); err != nil { - return Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %w", err) - } - } - } else { - if _, ok := wp.defaultOpaquePorts[wp.port]; ok { - address.OpaqueProtocol = true + for _, l := range wp.listeners { + if err := l.Update(&wp.addr); err != nil { + wp.log.Warnf("Error sending update to listener: %s", err) + continue } } - - return address, nil + wp.metrics.incUpdates() } // GetAnnotatedOpaquePorts returns the opaque ports for the pod given its