diff --git a/controller/api/destination/endpoint_profile_translator.go b/controller/api/destination/endpoint_profile_translator.go index 460a8a059b961..48da6a84fb2eb 100644 --- a/controller/api/destination/endpoint_profile_translator.go +++ b/controller/api/destination/endpoint_profile_translator.go @@ -128,10 +128,11 @@ func (ept *endpointProfileTranslator) update(address *watcher.Address) { return } + _, opaqueProtocol := opaquePorts[address.Port] profile := &pb.DestinationProfile{ RetryBudget: defaultRetryBudget(), Endpoint: endpoint, - OpaqueProtocol: address.OpaqueProtocol, + OpaqueProtocol: opaqueProtocol || address.OpaqueProtocol, } if proto.Equal(profile, ept.current) { ept.log.Debugf("Ignoring redundant profile update: %+v", profile) diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 0f7286e3c2397..5a250fff3d96b 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -66,6 +66,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.12 podIPs: - ip: 172.17.0.12 @@ -146,6 +149,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.13 podIPs: - ip: 172.17.0.13` @@ -193,6 +199,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.14 podIPs: - ip: 172.17.0.14 @@ -258,6 +267,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.15 podIPs: - ip: 172.17.0.15 @@ -311,6 +323,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.13.15 podIPs: - ip: 172.17.13.15`, @@ -358,6 +373,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.0.16 podIPs: - ip: 172.17.0.16 @@ -408,6 +426,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" hostIP: 192.168.1.20 podIP: 172.17.0.17 podIPs: @@ -466,6 +487,9 @@ metadata: namespace: ns status: phase: Running + conditions: + - type: Ready + status: "True" podIP: 172.17.55.1 podIPs: - ip: 172.17.55.1 @@ -528,7 +552,7 @@ spec: port: 4143 status: conditions: - ready: true`, + - ready: true`, ` apiVersion: workload.linkerd.io/v1beta1 kind: ExternalWorkload diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index f7aa6fbb1eccb..4b9be3ee6a60d 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -931,7 +931,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A pp.log.Errorf("Unable to create new address:%v", err) continue } - err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort) + err = SetToServerProtocol(pp.k8sAPI, &address) if err != nil { pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) continue @@ -955,7 +955,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A continue } - err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address, resolvedPort) + err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address) if err != nil { pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) continue @@ -1067,7 +1067,7 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre pp.log.Errorf("Unable to create new address:%v", err) continue } - err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort) + err = SetToServerProtocol(pp.k8sAPI, &address) if err != nil { pp.log.Errorf("failed to set address OpaqueProtocol: %s", err) continue @@ -1487,7 +1487,7 @@ func isValidSlice(es *discovery.EndpointSlice) bool { // SetToServerProtocol sets the address's OpaqueProtocol field based off any // Servers that select it and override the expected protocol. -func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error { +func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error { if address.Pod == nil { return fmt.Errorf("endpoint not backed by Pod: %s:%d", address.IP, address.Port) } @@ -1504,13 +1504,13 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error { var portMatch bool switch server.Spec.Port.Type { case intstr.Int: - if server.Spec.Port.IntVal == int32(port) { + if server.Spec.Port.IntVal == int32(address.Port) { portMatch = true } case intstr.String: for _, c := range address.Pod.Spec.Containers { for _, p := range c.Ports { - if (p.ContainerPort == int32(port) || p.HostPort == int32(port)) && + if (p.ContainerPort == int32(address.Port) || p.HostPort == int32(address.Port)) && p.Name == server.Spec.Port.StrVal { portMatch = true } @@ -1530,7 +1530,7 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error { // setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any // Servers that select it and override the expected protocol for ExternalWorkloads. -func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port Port) error { +func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error { if address.ExternalWorkload == nil { return fmt.Errorf("endpoint not backed by ExternalWorkload: %s:%d", address.IP, address.Port) } @@ -1547,12 +1547,12 @@ func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port var portMatch bool switch server.Spec.Port.Type { case intstr.Int: - if server.Spec.Port.IntVal == int32(port) { + if server.Spec.Port.IntVal == int32(address.Port) { portMatch = true } case intstr.String: for _, p := range address.ExternalWorkload.Spec.Ports { - if p.Port == int32(port) && p.Name == server.Spec.Port.StrVal { + if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal { portMatch = true } diff --git a/controller/api/destination/watcher/workload_watcher.go b/controller/api/destination/watcher/workload_watcher.go index 2e1fd65d6d2d4..9e562b0d921c3 100644 --- a/controller/api/destination/watcher/workload_watcher.go +++ b/controller/api/destination/watcher/workload_watcher.go @@ -9,7 +9,6 @@ import ( "sync" "time" - externalworkload "github.com/linkerd/linkerd2/controller/api/destination/external-workload" ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1" "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2" "github.com/linkerd/linkerd2/controller/k8s" @@ -40,17 +39,15 @@ type ( mu sync.RWMutex } - // workloadPublisher represents an ip:port along with the backing pod - // or externalworkload (if any). It keeps a list of listeners to be notified - // whenever the workload or the associated opaque protocol config changes. + // workloadPublisher represents an address including ip:port, the backing + // pod or externalworkload (if any), and if the protocol is opaque. It keeps + // a list of listeners to be notified whenever the workload or the + // associated opaque protocol config changes. workloadPublisher struct { defaultOpaquePorts map[uint32]struct{} k8sAPI *k8s.API metadataAPI *k8s.MetadataAPI - ip string - port Port - pod *corev1.Pod - externalWorkload *ext.ExternalWorkload + addr Address listeners []WorkloadUpdateListener metrics metrics log *logging.Entry @@ -120,24 +117,16 @@ func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, po } else { ww.log.Debugf("Establishing watch on workload %s:%d", ip, port) } - pp, err := ww.getOrNewWorkloadPublisher(service, hostname, ip, port) + wp, err := ww.getOrNewWorkloadPublisher(service, hostname, ip, port) if err != nil { return "", err } - pp.subscribe(listener) - - address, err := pp.createAddress() - if err != nil { + if err = wp.subscribe(listener); err != nil { return "", err } - if err = listener.Update(&address); err != nil { - return "", fmt.Errorf("failed to send initial update: %w", err) - } - pp.metrics.incUpdates() - - return pp.ip, nil + return wp.addr.IP, nil } // Subscribe stops notifying the listener on chages on any pod backing the @@ -155,7 +144,7 @@ func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUp wp.unsubscribe(listener) if len(wp.listeners) == 0 { - delete(ww.publishers, IPPort{wp.ip, wp.port}) + delete(ww.publishers, IPPort{wp.addr.IP, wp.addr.Port}) } } @@ -341,7 +330,7 @@ func (ww *WorkloadWatcher) addOrDeleteServer(obj interface{}) { ww.updateServers(server) } -// updateServer triggers an Update() call to the listeners of the workloadPublishers +// updateServers triggers an Update() call to the listeners of the workloadPublishers // whose pod matches the any of the Servers' podSelector or whose // externalworkload matches any of the Servers' externalworkload selection. This // function is an event handler so it cannot block. @@ -351,23 +340,38 @@ func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) { for _, wp := range ww.publishers { var opaquePorts map[uint32]struct{} - if wp.pod != nil { - if !ww.isPodSelectedByAny(wp.pod, servers...) { + if wp.addr.Pod != nil { + if !ww.isPodSelectedByAny(wp.addr.Pod, servers...) { continue } - opaquePorts = GetAnnotatedOpaquePorts(wp.pod, ww.defaultOpaquePorts) - } else if wp.externalWorkload != nil { - if !ww.isExternalWorkloadSelectedByAny(wp.externalWorkload, servers...) { + opaquePorts = GetAnnotatedOpaquePorts(wp.addr.Pod, ww.defaultOpaquePorts) + } else if wp.addr.ExternalWorkload != nil { + if !ww.isExternalWorkloadSelectedByAny(wp.addr.ExternalWorkload, servers...) { continue } - opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, ww.defaultOpaquePorts) + opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.addr.ExternalWorkload, ww.defaultOpaquePorts) } else { continue } - _, isOpaque := opaquePorts[wp.port] + _, annotatedOpaque := opaquePorts[wp.addr.Port] // if port is annotated to be always opaque we can disregard Server updates - if isOpaque { + if annotatedOpaque { + continue + } + + opaque := wp.addr.OpaqueProtocol + name := net.JoinHostPort(wp.addr.IP, fmt.Sprintf("%d", wp.addr.Port)) + if wp.addr.Pod != nil { + name = wp.addr.Pod.GetName() + } else if wp.addr.ExternalWorkload != nil { + name = wp.addr.ExternalWorkload.GetName() + } + if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil { + wp.log.Errorf("Error computing opaque protocol for %s: %q", name, err) + } + if wp.addr.OpaqueProtocol == opaque { + // OpaqueProtocol has not changed. No need to update the listeners. continue } @@ -375,25 +379,13 @@ func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) { wp.mu.RLock() defer wp.mu.RUnlock() - updated := false for _, listener := range wp.listeners { - // the Server in question doesn't carry information about other - // Servers that might target this workloadPublisher; createAddress() - // queries all the relevant Servers to determine the full state - addr, err := wp.createAddress() - if err != nil { - ww.log.Errorf("Error creating address for workload: %s", err) - continue - } - if err = listener.Update(&addr); err != nil { + if err := listener.Update(&wp.addr); err != nil { ww.log.Warnf("Error sending update to listener: %s", err) continue } - updated = true - } - if updated { - wp.metrics.incUpdates() } + wp.metrics.incUpdates() }(wp) } } @@ -467,10 +459,10 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam defaultOpaquePorts: ww.defaultOpaquePorts, k8sAPI: ww.k8sAPI, metadataAPI: ww.metadataAPI, - ip: ip, - port: port, - pod: pod, - externalWorkload: externalWorkload, + addr: Address{ + IP: ip, + Port: port, + }, metrics: ipPortVecs.newMetrics(prometheus.Labels{ "ip": ip, "port": strconv.FormatUint(uint64(port), 10), @@ -481,6 +473,12 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam "port": port, }), } + if pod != nil { + wp.updatePod(pod) + } + if externalWorkload != nil { + wp.updateExternalWorkload(externalWorkload) + } ww.publishers[ipPort] = wp } return wp, nil @@ -626,12 +624,18 @@ func (ww *WorkloadWatcher) getEndpointByHostname(hostname string, svcID *Service return nil, status.Errorf(codes.NotFound, "no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname) } -func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) { +func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) error { wp.mu.Lock() defer wp.mu.Unlock() wp.listeners = append(wp.listeners, listener) wp.metrics.setSubscribers(len(wp.listeners)) + + if err := listener.Update(&wp.addr); err != nil { + return fmt.Errorf("failed to send initial update: %w", err) + } + wp.metrics.incUpdates() + return nil } func (wp *workloadPublisher) unsubscribe(listener WorkloadUpdateListener) { @@ -660,7 +664,7 @@ func (wp *workloadPublisher) updatePod(pod *corev1.Pod) { defer wp.mu.Unlock() // pod wasn't ready or there was no backing pod - check if passed pod is ready - if wp.pod == nil { + if wp.addr.Pod == nil { if pod == nil { wp.log.Trace("Pod deletion event already consumed - ignore") return @@ -672,46 +676,50 @@ func (wp *workloadPublisher) updatePod(pod *corev1.Pod) { } wp.log.Debugf("Pod %s.%s became ready", pod.Name, pod.Namespace) - wp.pod = pod - updated := false - for _, l := range wp.listeners { - addr, err := wp.createAddress() + wp.addr.Pod = pod + + // Fill in ownership. + if wp.addr.Pod != nil { + ownerKind, ownerName, err := wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.addr.Pod, true) if err != nil { - wp.log.Errorf("Error creating address for pod: %s", err) - continue + wp.log.Errorf("Error getting pod owner for pod %s: %q", wp.addr.Pod.GetName(), err) + } else { + wp.addr.OwnerKind = ownerKind + wp.addr.OwnerName = ownerName } - if err = l.Update(&addr); err != nil { + } + + // Compute opaque protocol. + if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil { + wp.log.Errorf("Error computing opaque protocol for pod %s: %q", wp.addr.Pod.GetName(), err) + } + + for _, l := range wp.listeners { + if err := l.Update(&wp.addr); err != nil { wp.log.Warnf("Error sending update to listener: %s", err) continue } - updated = true - } - if updated { - wp.metrics.incUpdates() } + wp.metrics.incUpdates() + return } // backing pod becoming unready or getting deleted if pod == nil || !isRunningAndReady(pod) { - wp.log.Debugf("Pod %s.%s deleted or it became unready - remove", wp.pod.Name, wp.pod.Namespace) - wp.pod = nil - updated := false + wp.log.Debugf("Pod %s.%s deleted or it became unready - remove", wp.addr.Pod.Name, wp.addr.Pod.Namespace) + wp.addr.Pod = nil + wp.addr.OwnerKind = "" + wp.addr.OwnerName = "" + wp.addr.OpaqueProtocol = false for _, l := range wp.listeners { - addr, err := wp.createAddress() - if err != nil { - wp.log.Errorf("Error creating address for pod: %s", err) - continue - } - if err = l.Update(&addr); err != nil { + if err := l.Update(&wp.addr); err != nil { wp.log.Warnf("Error sending update to listener: %s", err) continue } - updated = true - } - if updated { - wp.metrics.incUpdates() } + wp.metrics.incUpdates() + return } @@ -726,118 +734,26 @@ func (wp *workloadPublisher) updateExternalWorkload(externalWorkload *ext.Extern wp.mu.Lock() defer wp.mu.Unlock() - // externalWorkload wasn't ready or there was no backing externalWorkload. - // check if passed externalWorkload is ready - if wp.externalWorkload == nil { - if externalWorkload == nil { - wp.log.Trace("ExternalWorkload deletion event already consumed - ignore") - return - } - - if !externalworkload.IsEwReady(externalWorkload) { - wp.log.Tracef("ExternalWorkload %s.%s not ready - ignore", externalWorkload.Name, externalWorkload.Namespace) - return - } + wp.addr.ExternalWorkload = externalWorkload - wp.log.Debugf("ExternalWorkload %s.%s became ready", externalWorkload.Name, externalWorkload.Namespace) - wp.externalWorkload = externalWorkload - updated := false - for _, l := range wp.listeners { - addr, err := wp.createAddress() - if err != nil { - wp.log.Errorf("Error creating address for externalWorkload: %s", err) - continue - } - if err = l.Update(&addr); err != nil { - wp.log.Warnf("Error sending update to listener: %s", err) - continue - } - updated = true - } - if updated { - wp.metrics.incUpdates() - } - return - } - - // backing pod becoming unready or getting deleted - if externalWorkload == nil || !externalworkload.IsEwReady(externalWorkload) { - wp.log.Debugf("ExternalWorkload %s.%s deleted or it became unready - remove", wp.externalWorkload.Name, wp.externalWorkload.Namespace) - wp.externalWorkload = nil - updated := false - for _, l := range wp.listeners { - addr, err := wp.createAddress() - if err != nil { - wp.log.Errorf("Error creating address for pod: %s", err) - continue - } - if err = l.Update(&addr); err != nil { - wp.log.Warnf("Error sending update to listener: %s", err) - continue - } - updated = true - } - if updated { - wp.metrics.incUpdates() - } - return - } - - wp.log.Tracef("Ignored event on externalWorkload %s.%s", externalWorkload.Name, externalWorkload.Namespace) -} - -// createAddress returns an Address instance for the given ip, port and workload. It -// completes the ownership and opaque protocol information -func (wp *workloadPublisher) createAddress() (Address, error) { - var ownerKind, ownerName string - var err error - if wp.pod != nil { - ownerKind, ownerName, err = wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.pod, true) - if err != nil { - return Address{}, err - } - } else if wp.externalWorkload != nil { - if len(wp.externalWorkload.GetOwnerReferences()) == 1 { - ownerKind = wp.externalWorkload.GetOwnerReferences()[0].Kind - ownerName = wp.externalWorkload.GetOwnerReferences()[0].Name - } + // Fill in ownership. + if wp.addr.ExternalWorkload != nil && len(wp.addr.ExternalWorkload.GetOwnerReferences()) == 1 { + wp.addr.OwnerKind = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Kind + wp.addr.OwnerName = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Name } - address := Address{ - IP: wp.ip, - Port: wp.port, - Pod: wp.pod, - ExternalWorkload: wp.externalWorkload, - OwnerName: ownerName, - OwnerKind: ownerKind, + // Compute opaque protocol. + if err := SetToServerProtocolExternalWorkload(wp.k8sAPI, &wp.addr); err != nil { + wp.log.Errorf("Error computing opaque protocol for externalworkload %s: %q", wp.addr.ExternalWorkload.GetName(), err) } - // Override opaqueProtocol if the endpoint's port is annotated as opaque - if wp.pod != nil { - opaquePorts := GetAnnotatedOpaquePorts(wp.pod, wp.defaultOpaquePorts) - if _, ok := opaquePorts[wp.port]; ok { - address.OpaqueProtocol = true - } else { - if err := SetToServerProtocol(wp.k8sAPI, &address, wp.port); err != nil { - return Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %w", err) - } - } - } else if wp.externalWorkload != nil { - opaquePorts := GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, wp.defaultOpaquePorts) - if _, ok := opaquePorts[wp.port]; ok { - address.OpaqueProtocol = true - } else { - if err := SetToServerProtocolExternalWorkload(wp.k8sAPI, &address, wp.port); err != nil { - return Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %w", err) - } - } - } else { - if _, ok := wp.defaultOpaquePorts[wp.port]; ok { - address.OpaqueProtocol = true + for _, l := range wp.listeners { + if err := l.Update(&wp.addr); err != nil { + wp.log.Warnf("Error sending update to listener: %s", err) + continue } } - - return address, nil + wp.metrics.incUpdates() } // GetAnnotatedOpaquePorts returns the opaque ports for the pod given its