From 035f22cde3773539114357147914de7d5ea6d3a5 Mon Sep 17 00:00:00 2001 From: Gilberto Bertin Date: Thu, 27 Jun 2024 17:20:19 +0200 Subject: [PATCH 1/4] service: differentiate UDP and TCP protocols this commit updates the service logic, from the agent all the way down to the datapath, to take into account the L4 protocols specified in a k8s service. Historically Cilium has always been ignoring, at least from a end user perspective, the protocol specified in a k8s service, resulting in the inability to expose a port only for a given protocol, or in the opposite case, resulting in a single protocol-agnostic service exposed whenever 2 TCP and UDP services on the same port were configured. Co-authored-by: Nate Sweet Co-authored-by: Antonio Ojea Signed-off-by: Gilberto Bertin --- Documentation/cmdref/cilium_service_update.md | 1 + .../network/kubernetes/kubeproxy-free.rst | 16 ++-- api/v1/models/backend_address.go | 3 + api/v1/openapi.yaml | 3 + api/v1/server/embedded_spec.go | 8 ++ bpf/bpf_sock.c | 82 ++++++++++++++----- bpf/lib/common.h | 4 +- bpf/lib/lb.h | 6 +- cilium/cmd/bpf_lb_list.go | 7 +- cilium/cmd/service_update.go | 51 +++++++++--- pkg/datapath/types/lbmap.go | 7 +- pkg/k8s/service.go | 9 +- pkg/k8s/watchers/watcher.go | 8 +- pkg/loadbalancer/loadbalancer.go | 65 ++++++++++----- pkg/maps/lbmap/ipv4.go | 12 ++- pkg/maps/lbmap/ipv6.go | 15 ++-- pkg/maps/lbmap/lbmap.go | 44 +++++----- pkg/maps/lbmap/types.go | 15 +++- pkg/service/id_local.go | 1 + pkg/service/service.go | 16 +++- pkg/testutils/mockmaps/lbmap.go | 9 +- pkg/u8proto/u8proto.go | 9 ++ 22 files changed, 271 insertions(+), 120 deletions(-) diff --git a/Documentation/cmdref/cilium_service_update.md b/Documentation/cmdref/cilium_service_update.md index 4a114be137102..fddc5f4fec33e 100644 --- a/Documentation/cmdref/cilium_service_update.md +++ b/Documentation/cmdref/cilium_service_update.md @@ -24,6 +24,7 @@ cilium service update [flags] --k8s-load-balancer Set service as a k8s LoadBalancer --k8s-node-port Set service as a k8s NodePort --local-redirect Set service as Local Redirect + --protocol string Protocol for service (e.g. TCP, UDP) (default "tcp") --states strings Backend state(s) as {active(default),terminating,quarantined,maintenance} ``` diff --git a/Documentation/network/kubernetes/kubeproxy-free.rst b/Documentation/network/kubernetes/kubeproxy-free.rst index 210c8ae78830a..30d305b09e5dd 100644 --- a/Documentation/network/kubernetes/kubeproxy-free.rst +++ b/Documentation/network/kubernetes/kubeproxy-free.rst @@ -242,14 +242,14 @@ In this example, services with port ``31940`` were created (one for each of devi $ kubectl -n kube-system exec ds/cilium -- cilium service list ID Frontend Service Type Backend [...] - 4 10.104.239.135:80 ClusterIP 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 - 5 0.0.0.0:31940 NodePort 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 - 6 192.168.178.29:31940 NodePort 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 - 7 172.16.0.29:31940 NodePort 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 + 4 10.104.239.135:80/TCP ClusterIP 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP + 5 0.0.0.0:31940/TCP NodePort 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP + 6 192.168.178.29:31940/TCP NodePort 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP + 7 172.16.0.29:31940/TCP NodePort 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP Create a variable with the node port for testing: diff --git a/api/v1/models/backend_address.go b/api/v1/models/backend_address.go index 3f17b81f49175..f02578a08fe64 100644 --- a/api/v1/models/backend_address.go +++ b/api/v1/models/backend_address.go @@ -37,6 +37,9 @@ type BackendAddress struct { // on related annotation of global service. Applicable for active state only. Preferred bool `json:"preferred,omitempty"` + // Layer 4 protocol (TCP, UDP, etc) + Protocol string `json:"protocol,omitempty"` + // State of the backend for load-balancing service traffic // Enum: [active terminating quarantined maintenance] State string `json:"state,omitempty"` diff --git a/api/v1/openapi.yaml b/api/v1/openapi.yaml index db842956c06b4..d94e928fc65fd 100644 --- a/api/v1/openapi.yaml +++ b/api/v1/openapi.yaml @@ -2863,6 +2863,9 @@ definitions: ip: description: Layer 3 address type: string + protocol: + description: Layer 4 protocol (TCP, UDP, etc) + type: string port: description: Layer 4 port number type: integer diff --git a/api/v1/server/embedded_spec.go b/api/v1/server/embedded_spec.go index 19bf18240bdac..2c77f267c64ab 100644 --- a/api/v1/server/embedded_spec.go +++ b/api/v1/server/embedded_spec.go @@ -1874,6 +1874,10 @@ func init() { "description": "Indicator if this backend is preferred in the context of clustermesh service affinity. The value is set based\non related annotation of global service. Applicable for active state only.", "type": "boolean" }, + "protocol": { + "description": "Layer 4 protocol (TCP, UDP, etc)", + "type": "string" + }, "state": { "description": "State of the backend for load-balancing service traffic", "type": "string", @@ -7254,6 +7258,10 @@ func init() { "description": "Indicator if this backend is preferred in the context of clustermesh service affinity. The value is set based\non related annotation of global service. Applicable for active state only.", "type": "boolean" }, + "protocol": { + "description": "Layer 4 protocol (TCP, UDP, etc)", + "type": "string" + }, "state": { "description": "State of the backend for load-balancing service traffic", "type": "string", diff --git a/bpf/bpf_sock.c b/bpf/bpf_sock.c index 0cf21bd9e8078..9adf768e4cb1e 100644 --- a/bpf/bpf_sock.c +++ b/bpf/bpf_sock.c @@ -39,6 +39,9 @@ static __always_inline __maybe_unused bool is_v6_loopback(const union v6addr *da return ipv6_addr_equals(&loopback, daddr); } +/* Hack due to missing narrow ctx access. */ +#define ctx_protocol(__ctx) ((__u8)(volatile __u32)(__ctx)->protocol) + /* Hack due to missing narrow ctx access. */ static __always_inline __maybe_unused __be16 ctx_dst_port(const struct bpf_sock_addr *ctx) @@ -104,12 +107,12 @@ bool sock_is_health_check(struct bpf_sock_addr *ctx __maybe_unused) static __always_inline __maybe_unused __u64 sock_select_slot(struct bpf_sock_addr *ctx) { - return ctx->protocol == IPPROTO_TCP ? + return ctx_protocol(ctx) == IPPROTO_TCP ? get_prandom_u32() : sock_local_cookie(ctx); } static __always_inline __maybe_unused -bool sock_proto_enabled(__u32 proto) +bool sock_proto_enabled(__u8 proto) { switch (proto) { case IPPROTO_TCP: @@ -294,10 +297,12 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, struct lb4_backend *backend; struct lb4_service *svc; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); __u32 dst_ip = ctx->user_ip4; struct lb4_key key = { .address = dst_ip, .dport = dst_port, + .proto = protocol, }, orig_key = key; struct lb4_service *backend_slot; bool backend_from_affinity = false; @@ -309,7 +314,7 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, if (is_defined(ENABLE_SOCKET_LB_HOST_ONLY) && !in_hostns) return -ENXIO; - if (!udp_only && !sock_proto_enabled(ctx->protocol)) + if (!udp_only && !sock_proto_enabled(protocol)) return -ENOTSUP; /* In case a direct match fails, we try to look-up surrogate @@ -317,8 +322,13 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, * HostPort services. */ svc = lb4_lookup_service(&key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb4_lookup_service + * has overwritten it. + */ + key.proto = protocol; svc = sock4_wildcard_lookup_full(&key, in_hostns); + } if (!svc) return -ENXIO; if (svc->count == 0 && !lb4_svc_is_l7loadbalancer(svc)) @@ -477,23 +487,30 @@ int cil_sock4_connect(struct bpf_sock_addr *ctx) static __always_inline int __sock4_post_bind(struct bpf_sock *ctx, struct bpf_sock *ctx_full) { + __u8 protocol = ctx_protocol(ctx); struct lb4_service *svc; struct lb4_key key = { .address = ctx->src_ip4, .dport = ctx_src_port(ctx), + .proto = protocol, }; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(protocol) || !ctx_in_hostns(ctx_full, NULL)) return 0; svc = lb4_lookup_service(&key, true, false); - if (!svc) + if (!svc) { /* Perform a wildcard lookup for the case where the caller * tries to bind to loopback or an address with host identity * (without remote hosts). + * + * Restore the original key's protocol as lb4_lookup_service + * has overwritten it. */ + key.proto = protocol; svc = sock4_wildcard_lookup(&key, false, false, true); + } /* If the sockaddr of this socket overlaps with a NodePort, * LoadBalancer or ExternalIP service. We must reject this @@ -540,7 +557,7 @@ static __always_inline int __sock4_pre_bind(struct bpf_sock_addr *ctx, .peer = { .address = ctx->user_ip4, .port = ctx_dst_port(ctx), - .proto = (__u8)ctx->protocol, + .proto = ctx_protocol(ctx), }, }; int ret; @@ -556,7 +573,7 @@ int cil_sock4_pre_bind(struct bpf_sock_addr *ctx) { int ret = SYS_PROCEED; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(ctx_protocol(ctx)) || !ctx_in_hostns(ctx, NULL)) return ret; if (sock_is_health_check(ctx) && @@ -573,6 +590,7 @@ static __always_inline int __sock4_xlate_rev(struct bpf_sock_addr *ctx, { struct ipv4_revnat_entry *val; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); __u32 dst_ip = ctx->user_ip4; struct ipv4_revnat_tuple key = { .cookie = sock_local_cookie(ctx_full), @@ -588,12 +606,18 @@ static __always_inline int __sock4_xlate_rev(struct bpf_sock_addr *ctx, struct lb4_key svc_key = { .address = val->address, .dport = val->port, + .proto = protocol, }; svc = lb4_lookup_service(&svc_key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb4_lookup_service + * has overwritten it. + */ + svc_key.proto = protocol; svc = sock4_wildcard_lookup_full(&svc_key, ctx_in_hostns(ctx_full, NULL)); + } if (!svc || svc->rev_nat_index != val->rev_nat_index || (svc->count == 0 && !lb4_svc_is_l7loadbalancer(svc))) { map_delete_elem(&LB4_REVERSE_NAT_SK_MAP, &key); @@ -810,7 +834,7 @@ int sock6_xlate_v4_in_v6(struct bpf_sock_addr *ctx __maybe_unused, return -ENXIO; memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.user_ip4 = addr6.p4; fake_ctx.user_port = ctx_dst_port(ctx); @@ -840,7 +864,7 @@ sock6_post_bind_v4_in_v6(struct bpf_sock *ctx __maybe_unused) return 0; memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.src_ip4 = addr6.p4; fake_ctx.src_port = ctx->src_port; @@ -851,12 +875,14 @@ sock6_post_bind_v4_in_v6(struct bpf_sock *ctx __maybe_unused) static __always_inline int __sock6_post_bind(struct bpf_sock *ctx) { + __u8 protocol = ctx_protocol(ctx); struct lb6_service *svc; struct lb6_key key = { .dport = ctx_src_port(ctx), + .proto = protocol, }; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(protocol) || !ctx_in_hostns(ctx, NULL)) return 0; @@ -864,6 +890,10 @@ static __always_inline int __sock6_post_bind(struct bpf_sock *ctx) svc = lb6_lookup_service(&key, true, false); if (!svc) { + /* Restore the original key's protocol as lb6_lookup_service + * has overwritten it. + */ + key.proto = protocol; svc = sock6_wildcard_lookup(&key, false, false, true); if (!svc) return sock6_post_bind_v4_in_v6(ctx); @@ -904,7 +934,7 @@ sock6_pre_bind_v4_in_v6(struct bpf_sock_addr *ctx __maybe_unused) ctx_get_v6_address(ctx, &addr6); memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.user_ip4 = addr6.p4; fake_ctx.user_port = ctx_dst_port(ctx); @@ -935,7 +965,7 @@ static __always_inline int __sock6_pre_bind(struct bpf_sock_addr *ctx) struct lb6_health val = { .peer = { .port = ctx_dst_port(ctx), - .proto = (__u8)ctx->protocol, + .proto = ctx_protocol(ctx), }, }; int ret = 0; @@ -957,7 +987,7 @@ int cil_sock6_pre_bind(struct bpf_sock_addr *ctx) { int ret = SYS_PROCEED; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(ctx_protocol(ctx)) || !ctx_in_hostns(ctx, NULL)) return ret; if (sock_is_health_check(ctx) && @@ -978,8 +1008,10 @@ static __always_inline int __sock6_xlate_fwd(struct bpf_sock_addr *ctx, struct lb6_backend *backend; struct lb6_service *svc; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); struct lb6_key key = { .dport = dst_port, + .proto = protocol, }, orig_key; struct lb6_service *backend_slot; bool backend_from_affinity = false; @@ -991,15 +1023,20 @@ static __always_inline int __sock6_xlate_fwd(struct bpf_sock_addr *ctx, if (is_defined(ENABLE_SOCKET_LB_HOST_ONLY) && !in_hostns) return -ENXIO; - if (!udp_only && !sock_proto_enabled(ctx->protocol)) + if (!udp_only && !sock_proto_enabled(protocol)) return -ENOTSUP; ctx_get_v6_address(ctx, &key.address); memcpy(&orig_key, &key, sizeof(key)); svc = lb6_lookup_service(&key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb6_lookup_service + * has overwritten it. + */ + key.proto = protocol; svc = sock6_wildcard_lookup_full(&key, in_hostns); + } if (!svc) return sock6_xlate_v4_in_v6(ctx, udp_only); if (svc->count == 0 && !lb6_svc_is_l7loadbalancer(svc)) @@ -1144,7 +1181,7 @@ sock6_xlate_rev_v4_in_v6(struct bpf_sock_addr *ctx __maybe_unused) return -ENXIO; memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.user_ip4 = addr6.p4; fake_ctx.user_port = ctx_dst_port(ctx); @@ -1167,6 +1204,7 @@ static __always_inline int __sock6_xlate_rev(struct bpf_sock_addr *ctx) struct ipv6_revnat_tuple key = {}; struct ipv6_revnat_entry *val; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); key.cookie = sock_local_cookie(ctx); key.port = dst_port; @@ -1181,12 +1219,18 @@ static __always_inline int __sock6_xlate_rev(struct bpf_sock_addr *ctx) struct lb6_key svc_key = { .address = val->address, .dport = val->port, + .proto = protocol, }; svc = lb6_lookup_service(&svc_key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb6_lookup_service + * has overwritten it. + */ + svc_key.proto = protocol; svc = sock6_wildcard_lookup_full(&svc_key, ctx_in_hostns(ctx, NULL)); + } if (!svc || svc->rev_nat_index != val->rev_nat_index || (svc->count == 0 && !lb6_svc_is_l7loadbalancer(svc))) { map_delete_elem(&LB6_REVERSE_NAT_SK_MAP, &key); diff --git a/bpf/lib/common.h b/bpf/lib/common.h index 3f18c9f1ec614..7a22e006bd375 100644 --- a/bpf/lib/common.h +++ b/bpf/lib/common.h @@ -947,7 +947,7 @@ struct lb6_key { union v6addr address; /* Service virtual IPv6 address */ __be16 dport; /* L4 port filter, if unset, all ports apply */ __u16 backend_slot; /* Backend iterator, 0 indicates the svc frontend */ - __u8 proto; /* L4 protocol, currently not used (set to 0) */ + __u8 proto; /* L4 protocol, 0 indicates any protocol */ __u8 scope; /* LB_LOOKUP_SCOPE_* for externalTrafficPolicy=Local */ __u8 pad[2]; }; @@ -1005,7 +1005,7 @@ struct lb4_key { __be32 address; /* Service virtual IPv4 address */ __be16 dport; /* L4 port filter, if unset, all ports apply */ __u16 backend_slot; /* Backend iterator, 0 indicates the svc frontend */ - __u8 proto; /* L4 protocol, currently not used (set to 0) */ + __u8 proto; /* L4 protocol, 0 indicates any protocol */ __u8 scope; /* LB_LOOKUP_SCOPE_* for externalTrafficPolicy=Local */ __u8 pad[2]; }; diff --git a/bpf/lib/lb.h b/bpf/lib/lb.h index 85ea0ada76e29..e1a132ad132a0 100644 --- a/bpf/lib/lb.h +++ b/bpf/lib/lb.h @@ -490,8 +490,7 @@ static __always_inline int lb6_rev_nat(struct __ctx_buff *ctx, int l4_off, static __always_inline void lb6_fill_key(struct lb6_key *key, struct ipv6_ct_tuple *tuple) { - /* FIXME: set after adding support for different L4 protocols in LB */ - key->proto = 0; + key->proto = tuple->nexthdr; ipv6_addr_copy(&key->address, &tuple->daddr); key->dport = tuple->sport; } @@ -1148,8 +1147,7 @@ static __always_inline int lb4_rev_nat(struct __ctx_buff *ctx, int l3_off, int l static __always_inline void lb4_fill_key(struct lb4_key *key, const struct ipv4_ct_tuple *tuple) { - /* FIXME: set after adding support for different L4 protocols in LB */ - key->proto = 0; + key->proto = tuple->nexthdr; key->address = tuple->daddr; /* CT tuple has ports in reverse order: */ key->dport = tuple->sport; diff --git a/cilium/cmd/bpf_lb_list.go b/cilium/cmd/bpf_lb_list.go index e5f2522635904..b3a4b1f43284e 100644 --- a/cilium/cmd/bpf_lb_list.go +++ b/cilium/cmd/bpf_lb_list.go @@ -14,6 +14,7 @@ import ( "github.com/cilium/cilium/pkg/common" "github.com/cilium/cilium/pkg/loadbalancer" "github.com/cilium/cilium/pkg/maps/lbmap" + "github.com/cilium/cilium/pkg/u8proto" ) const ( @@ -105,12 +106,12 @@ func dumpSVC(serviceList map[string][]string) { } else if backend, found := backendMap[backendID]; !found { entry = fmt.Sprintf("backend %d not found", backendID) } else { - fmtStr := "%s:%d (%d) (%d)" + fmtStr := "%s:%d/%s (%d) (%d)" if svcKey.IsIPv6() { - fmtStr = "[%s]:%d (%d) (%d)" + fmtStr = "[%s]:%d/%s (%d) (%d)" } entry = fmt.Sprintf(fmtStr, backend.GetAddress(), - backend.GetPort(), revNATID, backendSlot) + backend.GetPort(), u8proto.U8proto(backend.GetProtocol()).String(), revNATID, backendSlot) } serviceList[svc] = append(serviceList[svc], entry) diff --git a/cilium/cmd/service_update.go b/cilium/cmd/service_update.go index bc0a2d76b5ca1..758be4c6a5582 100644 --- a/cilium/cmd/service_update.go +++ b/cilium/cmd/service_update.go @@ -29,6 +29,7 @@ var ( localRedirect bool idU uint64 frontend string + protocol string backends []string backendStates []string backendWeights []uint @@ -59,13 +60,38 @@ func init() { serviceUpdateCmd.Flags().StringVarP(&k8sIntTrafficPolicy, "k8s-int-traffic-policy", "", "Cluster", "Set service with k8s internalTrafficPolicy as {Local,Cluster}") serviceUpdateCmd.Flags().BoolVarP(&k8sClusterInternal, "k8s-cluster-internal", "", false, "Set service as cluster-internal for externalTrafficPolicy=Local xor internalTrafficPolicy=Local") serviceUpdateCmd.Flags().StringVarP(&frontend, "frontend", "", "", "Frontend address") + serviceUpdateCmd.Flags().StringVarP(&protocol, "protocol", "", "tcp", "Protocol for service (e.g. TCP, UDP)") serviceUpdateCmd.Flags().StringSliceVarP(&backends, "backends", "", []string{}, "Backend address or addresses ()") serviceUpdateCmd.Flags().StringSliceVarP(&backendStates, "states", "", []string{}, "Backend state(s) as {active(default),terminating,quarantined,maintenance}") serviceUpdateCmd.Flags().UintSliceVarP(&backendWeights, "backend-weights", "", []uint{}, "Backend weights (100 default, 0 means maintenance state, only for maglev mode)") } -func parseFrontendAddress(address string) *models.FrontendAddress { - frontend, err := net.ResolveTCPAddr("tcp", address) +func parseAddress(l4Protocol, address string) (ip net.IP, port int, proto string, err error) { + switch proto = strings.ToLower(l4Protocol); proto { + case "tcp": + var tcpAddr *net.TCPAddr + tcpAddr, err = net.ResolveTCPAddr(proto, address) + if err != nil { + return + } + ip = tcpAddr.IP + port = tcpAddr.Port + case "udp": + var udpAddr *net.UDPAddr + udpAddr, err = net.ResolveUDPAddr(proto, address) + if err != nil { + return + } + ip = udpAddr.IP + port = udpAddr.Port + default: + err = fmt.Errorf("unrecognized protocol %q", l4Protocol) + } + return +} + +func parseFrontendAddress(l4Protocol, address string) *models.FrontendAddress { + ip, port, proto, err := parseAddress(l4Protocol, address) if err != nil { Fatalf("Unable to parse frontend address: %s\n", err) } @@ -75,11 +101,10 @@ func parseFrontendAddress(address string) *models.FrontendAddress { scope = models.FrontendAddressScopeInternal } - // FIXME support more than TCP return &models.FrontendAddress{ - IP: frontend.IP.String(), - Port: uint16(frontend.Port), - Protocol: models.FrontendAddressProtocolTCP, + IP: ip.String(), + Port: uint16(port), + Protocol: proto, Scope: scope, } } @@ -95,7 +120,7 @@ func updateService(cmd *cobra.Command, args []string) { warnIdTypeDeprecation() id := int64(idU) - fa := parseFrontendAddress(frontend) + fa := parseFrontendAddress(protocol, frontend) skipFrontendCheck := false var spec *models.ServiceSpec @@ -203,17 +228,17 @@ func updateService(cmd *cobra.Command, args []string) { default: Fatalf("Invalid number of backend states (%v) for backends (%v)", backendStates, backends) } + for i, backend := range backends { - beAddr, err := net.ResolveTCPAddr("tcp", backend) + ip, port, proto, err := parseAddress(protocol, backend) if err != nil { - Fatalf("Cannot parse backend address \"%s\": %s", backend, err) + Fatalf("Cannot parse backend address %q: %s", backend, err) } - // Backend ID will be set by the daemon - be := loadbalancer.NewBackend(0, loadbalancer.TCP, cmtypes.MustAddrClusterFromIP(beAddr.IP), uint16(beAddr.Port)) + be := loadbalancer.NewBackend(0, loadbalancer.L4Type(strings.ToUpper(proto)), cmtypes.MustAddrClusterFromIP(ip), uint16(port)) - if !skipFrontendCheck && fa.Port == 0 && beAddr.Port != 0 { - Fatalf("L4 backend found (%v) with L3 frontend", beAddr) + if !skipFrontendCheck && fa.Port == 0 && port != 0 { + Fatalf("L4 backend found (%s:%d) with L3 frontend", ip, port) } ba := be.GetBackendModel() diff --git a/pkg/datapath/types/lbmap.go b/pkg/datapath/types/lbmap.go index 44ba2bbe6b417..47cd80c214957 100644 --- a/pkg/datapath/types/lbmap.go +++ b/pkg/datapath/types/lbmap.go @@ -30,9 +30,10 @@ type LBMap interface { } type UpsertServiceParams struct { - ID uint16 - IP net.IP - Port uint16 + ID uint16 + IP net.IP + Port uint16 + Protocol uint8 // PreferredBackends is a subset of ActiveBackends // Note: this is only used in clustermesh with service affinity annotation. diff --git a/pkg/k8s/service.go b/pkg/k8s/service.go index 4ec47f42b9d7d..8bd202671dbd5 100644 --- a/pkg/k8s/service.go +++ b/pkg/k8s/service.go @@ -533,13 +533,10 @@ func NewService(ips []net.IP, externalIPs, loadBalancerIPs, loadBalancerSourceRa } // UniquePorts returns a map of all unique ports configured in the service -func (s *Service) UniquePorts() map[uint16]bool { - // We are not discriminating the different L4 protocols on the same L4 - // port so we create the number of unique sets of service IP + service - // port. - uniqPorts := map[uint16]bool{} +func (s *Service) UniquePorts() map[string]bool { + uniqPorts := map[string]bool{} for _, p := range s.Ports { - uniqPorts[p.Port] = true + uniqPorts[p.String()] = true } return uniqPorts } diff --git a/pkg/k8s/watchers/watcher.go b/pkg/k8s/watchers/watcher.go index 0a0b3935e843c..288411c257a97 100644 --- a/pkg/k8s/watchers/watcher.go +++ b/pkg/k8s/watchers/watcher.go @@ -728,10 +728,10 @@ func (k *K8sWatcher) delK8sSVCs(svc k8s.ServiceID, svcInfo *k8s.Service, se *k8s frontends := []*loadbalancer.L3n4Addr{} for portName, svcPort := range svcInfo.Ports { - if !repPorts[svcPort.Port] { + if !repPorts[svcPort.String()] { continue } - repPorts[svcPort.Port] = false + repPorts[svcPort.String()] = false for _, feIP := range svcInfo.FrontendIPs { fe := loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(feIP), svcPort.Port, loadbalancer.ScopeExternal) @@ -866,10 +866,10 @@ func datapathSVCs(svc *k8s.Service, endpoints *k8s.Endpoints) (svcs []loadbalanc clusterIPPorts := map[loadbalancer.FEPortName]*loadbalancer.L4Addr{} for fePortName, fePort := range svc.Ports { - if !uniqPorts[fePort.Port] { + if !uniqPorts[fePort.String()] { continue } - uniqPorts[fePort.Port] = false + uniqPorts[fePort.String()] = false clusterIPPorts[fePortName] = fePort } diff --git a/pkg/loadbalancer/loadbalancer.go b/pkg/loadbalancer/loadbalancer.go index 92c4bd1260e3c..1cc49ce8fc5b7 100644 --- a/pkg/loadbalancer/loadbalancer.go +++ b/pkg/loadbalancer/loadbalancer.go @@ -13,6 +13,7 @@ import ( "github.com/cilium/cilium/pkg/cidr" cmtypes "github.com/cilium/cilium/pkg/clustermesh/types" "github.com/cilium/cilium/pkg/option" + "github.com/cilium/cilium/pkg/u8proto" ) // SVCType is a type of a service. @@ -229,6 +230,7 @@ func (s ServiceFlags) UInt16() uint16 { } const ( + // NONE type. NONE = L4Type("NONE") // TCP type. TCP = L4Type("TCP") @@ -535,6 +537,19 @@ func NewL4Type(name string) (L4Type, error) { } } +func NewL4TypeFromNumber(proto uint8) L4Type { + switch proto { + case 6: + return TCP + case 17: + return UDP + case 132: + return SCTP + default: + return NONE + } +} + // L4Addr is an abstraction for the backend port with a L4Type, usually tcp or udp, and // the Port number. // @@ -558,6 +573,22 @@ func NewL4Addr(protocol L4Type, number uint16) *L4Addr { return &L4Addr{Protocol: protocol, Port: number} } +// Equals returns true if both L4Addr are considered equal. +func (l *L4Addr) Equals(o *L4Addr) bool { + switch { + case (l == nil) != (o == nil): + return false + case (l == nil) && (o == nil): + return true + } + return l.Port == o.Port && l.Protocol == o.Protocol +} + +// String returns a string representation of an L4Addr +func (l *L4Addr) String() string { + return fmt.Sprintf("%d/%s", l.Port, l.Protocol) +} + // L3n4Addr is used to store, as an unique L3+L4 address in the KVStore. It also // includes the lookup scope for frontend addresses which is used in service // handling for externalTrafficPolicy=Local and internalTrafficPolicy=Local, @@ -659,8 +690,7 @@ func NewBackendFromBackendModel(base *models.BackendAddress) (*Backend, error) { return nil, fmt.Errorf("missing IP address") } - // FIXME: Should this be NONE ? - l4addr := NewL4Addr(NONE, base.Port) + l4addr := NewL4Addr(base.Protocol, base.Port) addrCluster, err := cmtypes.ParseAddrCluster(*base.IP) if err != nil { return nil, err @@ -693,8 +723,7 @@ func NewL3n4AddrFromBackendModel(base *models.BackendAddress) (*L3n4Addr, error) return nil, fmt.Errorf("missing IP address") } - // FIXME: Should this be NONE ? - l4addr := NewL4Addr(NONE, base.Port) + l4addr := NewL4Addr(base.Protocol, base.Port) addrCluster, err := cmtypes.ParseAddrCluster(*base.IP) if err != nil { return nil, err @@ -712,9 +741,10 @@ func (a *L3n4Addr) GetModel() *models.FrontendAddress { scope = models.FrontendAddressScopeInternal } return &models.FrontendAddress{ - IP: a.AddrCluster.String(), - Port: a.Port, - Scope: scope, + IP: a.AddrCluster.String(), + Protocol: a.Protocol, + Port: a.Port, + Scope: scope, } } @@ -727,6 +757,7 @@ func (b *Backend) GetBackendModel() *models.BackendAddress { stateStr, _ := b.State.String() return &models.BackendAddress{ IP: &addrClusterStr, + Protocol: b.Protocol, Port: b.Port, NodeName: b.NodeName, State: stateStr, @@ -735,17 +766,10 @@ func (b *Backend) GetBackendModel() *models.BackendAddress { } } -// String returns the L3n4Addr in the "IPv4:Port[/Scope]" format for IPv4 and -// "[IPv6]:Port[/Scope]" format for IPv6. +// String returns the L3n4Addr in the "IPv4:Port/Protocol[/Scope]" format for IPv4 and +// "[IPv6]:Port/Protocol[/Scope]" format for IPv6. func (a *L3n4Addr) String() string { - var scope string - if a.Scope == ScopeInternal { - scope = "/i" - } - if a.IsIPv6() { - return fmt.Sprintf("[%s]:%d%s", a.AddrCluster.String(), a.Port, scope) - } - return fmt.Sprintf("%s:%d%s", a.AddrCluster.String(), a.Port, scope) + return a.StringWithProtocol() } // StringWithProtocol returns the L3n4Addr in the "IPv4:Port/Protocol[/Scope]" @@ -763,8 +787,6 @@ func (a *L3n4Addr) StringWithProtocol() string { // StringID returns the L3n4Addr as string to be used for unique identification func (a *L3n4Addr) StringID() string { - // This does not include the protocol right now as the datapath does - // not include the protocol in the lookup of the service IP. return a.String() } @@ -772,14 +794,15 @@ func (a *L3n4Addr) StringID() string { // Note: the resulting string is meant to be used as a key for maps and is not // readable by a human eye when printed out. func (a L3n4Addr) Hash() string { - const lenProto = 0 // proto is omitted for now + const lenProto = 1 // proto is uint8 const lenScope = 1 // scope is uint8 which is an alias for byte const lenPort = 2 // port is uint16 which is 2 bytes b := make([]byte, cmtypes.AddrClusterLen+lenProto+lenScope+lenPort) ac20 := a.AddrCluster.As20() copy(b, ac20[:]) - // FIXME: add Protocol once we care about protocols + u8p, _ := u8proto.ParseProtocol(a.Protocol) + b[net.IPv6len] = byte(u8p) // scope is a uint8 which is an alias for byte so a cast is safe b[net.IPv6len+lenProto] = byte(a.Scope) // port is a uint16, so 2 bytes diff --git a/pkg/maps/lbmap/ipv4.go b/pkg/maps/lbmap/ipv4.go index 1821c2d0e5816..750a5b4c64231 100644 --- a/pkg/maps/lbmap/ipv4.go +++ b/pkg/maps/lbmap/ipv4.go @@ -250,6 +250,7 @@ func NewService4Key(ip net.IP, port uint16, proto u8proto.U8proto, scope uint8, func (k *Service4Key) String() string { kHost := k.ToHost().(*Service4Key) addr := net.JoinHostPort(kHost.Address.String(), fmt.Sprintf("%d", kHost.Port)) + addr += fmt.Sprintf("/%s", u8proto.U8proto(kHost.Proto).String()) if kHost.Scope == loadbalancer.ScopeInternal { addr += "/i" } @@ -268,6 +269,7 @@ func (k *Service4Key) SetScope(scope uint8) { k.Scope = scope } func (k *Service4Key) GetScope() uint8 { return k.Scope } func (k *Service4Key) GetAddress() net.IP { return k.Address.IP() } func (k *Service4Key) GetPort() uint16 { return k.Port } +func (k *Service4Key) GetProtocol() uint8 { return k.Proto } func (k *Service4Key) MapDelete() error { return k.Map().Delete(k.ToNetwork()) } func (k *Service4Key) RevNatValue() RevNatValue { @@ -413,8 +415,9 @@ func (b *Backend4Value) GetAddress() net.IP { return b.Address.IP() } func (b *Backend4Value) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), 0) } -func (b *Backend4Value) GetPort() uint16 { return b.Port } -func (b *Backend4Value) GetFlags() uint8 { return b.Flags } +func (b *Backend4Value) GetPort() uint16 { return b.Port } +func (b *Backend4Value) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend4Value) GetFlags() uint8 { return b.Flags } func (v *Backend4Value) ToNetwork() BackendValue { n := *v @@ -475,8 +478,9 @@ func (b *Backend4ValueV3) GetAddress() net.IP { return b.Address.IP() } func (b *Backend4ValueV3) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), uint32(b.ClusterID)) } -func (b *Backend4ValueV3) GetPort() uint16 { return b.Port } -func (b *Backend4ValueV3) GetFlags() uint8 { return b.Flags } +func (b *Backend4ValueV3) GetPort() uint16 { return b.Port } +func (b *Backend4ValueV3) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend4ValueV3) GetFlags() uint8 { return b.Flags } func (v *Backend4ValueV3) ToNetwork() BackendValue { n := *v diff --git a/pkg/maps/lbmap/ipv6.go b/pkg/maps/lbmap/ipv6.go index 340cf9acc36d0..55beb19a78344 100644 --- a/pkg/maps/lbmap/ipv6.go +++ b/pkg/maps/lbmap/ipv6.go @@ -147,9 +147,9 @@ func NewService6Key(ip net.IP, port uint16, proto u8proto.U8proto, scope uint8, func (k *Service6Key) String() string { kHost := k.ToHost().(*Service6Key) if kHost.Scope == loadbalancer.ScopeInternal { - return fmt.Sprintf("[%s]:%d/i (%d)", kHost.Address, kHost.Port, kHost.BackendSlot) + return fmt.Sprintf("[%s]:%d/%s/i (%d)", kHost.Address, kHost.Port, u8proto.U8proto(kHost.Proto).String(), kHost.BackendSlot) } else { - return fmt.Sprintf("[%s]:%d (%d)", kHost.Address, kHost.Port, kHost.BackendSlot) + return fmt.Sprintf("[%s]:%d/%s (%d)", kHost.Address, kHost.Port, u8proto.U8proto(kHost.Proto).String(), kHost.BackendSlot) } } @@ -164,6 +164,7 @@ func (k *Service6Key) SetScope(scope uint8) { k.Scope = scope } func (k *Service6Key) GetScope() uint8 { return k.Scope } func (k *Service6Key) GetAddress() net.IP { return k.Address.IP() } func (k *Service6Key) GetPort() uint16 { return k.Port } +func (k *Service6Key) GetProtocol() uint8 { return k.Proto } func (k *Service6Key) MapDelete() error { return k.Map().Delete(k.ToNetwork()) } func (k *Service6Key) RevNatValue() RevNatValue { @@ -307,8 +308,9 @@ func (b *Backend6Value) GetAddress() net.IP { return b.Address.IP() } func (b *Backend6Value) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), 0) } -func (b *Backend6Value) GetPort() uint16 { return b.Port } -func (b *Backend6Value) GetFlags() uint8 { return b.Flags } +func (b *Backend6Value) GetPort() uint16 { return b.Port } +func (b *Backend6Value) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend6Value) GetFlags() uint8 { return b.Flags } func (v *Backend6Value) ToNetwork() BackendValue { n := *v @@ -369,8 +371,9 @@ func (b *Backend6ValueV3) GetAddress() net.IP { return b.Address.IP() } func (b *Backend6ValueV3) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), uint32(b.ClusterID)) } -func (b *Backend6ValueV3) GetPort() uint16 { return b.Port } -func (b *Backend6ValueV3) GetFlags() uint8 { return b.Flags } +func (b *Backend6ValueV3) GetPort() uint16 { return b.Port } +func (b *Backend6ValueV3) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend6ValueV3) GetFlags() uint8 { return b.Flags } func (v *Backend6ValueV3) ToNetwork() BackendValue { n := *v diff --git a/pkg/maps/lbmap/lbmap.go b/pkg/maps/lbmap/lbmap.go index ba73d33c9aa34..8865c23df7ea5 100644 --- a/pkg/maps/lbmap/lbmap.go +++ b/pkg/maps/lbmap/lbmap.go @@ -6,8 +6,6 @@ package lbmap import ( "errors" "fmt" - "net" - "strconv" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" @@ -76,10 +74,10 @@ func (lbmap *LBBPFMap) upsertServiceProto(p *datapathTypes.UpsertServiceParams, backendsOk := ipv6 || !ipv6 && p.NatPolicy != loadbalancer.SVCNatPolicyNat46 if ipv6 { - svcKey = NewService6Key(p.IP, p.Port, u8proto.ANY, p.Scope, 0) + svcKey = NewService6Key(p.IP, p.Port, u8proto.U8proto(p.Protocol), p.Scope, 0) svcVal = &Service6Value{} } else { - svcKey = NewService4Key(p.IP, p.Port, u8proto.ANY, p.Scope, 0) + svcKey = NewService4Key(p.IP, p.Port, u8proto.U8proto(p.Protocol), p.Scope, 0) svcVal = &Service4Value{} } @@ -200,11 +198,16 @@ func deleteServiceProto(svc loadbalancer.L3n4AddrID, backendCount int, useMaglev revNATKey RevNatKey ) + u8p, err := u8proto.ParseProtocol(svc.Protocol) + if err != nil { + return err + } + if ipv6 { - svcKey = NewService6Key(svc.AddrCluster.AsNetIP(), svc.Port, u8proto.ANY, svc.Scope, 0) + svcKey = NewService6Key(svc.AddrCluster.AsNetIP(), svc.Port, u8p, svc.Scope, 0) revNATKey = NewRevNat6Key(uint16(svc.ID)) } else { - svcKey = NewService4Key(svc.AddrCluster.AsNetIP(), svc.Port, u8proto.ANY, svc.Scope, 0) + svcKey = NewService4Key(svc.AddrCluster.AsNetIP(), svc.Port, u8p, svc.Scope, 0) revNATKey = NewRevNat4Key(uint16(svc.ID)) } @@ -454,11 +457,8 @@ func (*LBBPFMap) DumpServiceMaps() ([]*loadbalancer.SVC, []error) { if svcKey.GetBackendSlot() == 0 { // Build a cache of flags stored in the value of the master key to // map it later. - // FIXME proto is being ignored everywhere in the datapath. - addrStr := svcKey.GetAddress().String() - portStr := strconv.Itoa(int(svcKey.GetPort())) - flagsCache[net.JoinHostPort(addrStr, portStr)] = loadbalancer.ServiceFlags(svcValue.GetFlags()) + flagsCache[fe.String()] = loadbalancer.ServiceFlags(svcValue.GetFlags()) newSVCMap.addFE(fe) return } @@ -502,13 +502,11 @@ func (*LBBPFMap) DumpServiceMaps() ([]*loadbalancer.SVC, []error) { newSVCList := make([]*loadbalancer.SVC, 0, len(newSVCMap)) for hash := range newSVCMap { svc := newSVCMap[hash] - addrStr := svc.Frontend.AddrCluster.String() - portStr := strconv.Itoa(int(svc.Frontend.Port)) - host := net.JoinHostPort(addrStr, portStr) - svc.Type = flagsCache[host].SVCType() - svc.ExtTrafficPolicy = flagsCache[host].SVCExtTrafficPolicy() - svc.IntTrafficPolicy = flagsCache[host].SVCIntTrafficPolicy() - svc.NatPolicy = flagsCache[host].SVCNatPolicy(svc.Frontend.L3n4Addr) + key := svc.Frontend.String() + svc.Type = flagsCache[key].SVCType() + svc.ExtTrafficPolicy = flagsCache[key].SVCExtTrafficPolicy() + svc.IntTrafficPolicy = flagsCache[key].SVCIntTrafficPolicy() + svc.NatPolicy = flagsCache[key].SVCNatPolicy(svc.Frontend.L3n4Addr) newSVCList = append(newSVCList, &svc) } @@ -546,7 +544,7 @@ func (*LBBPFMap) DumpBackendMaps() ([]*loadbalancer.Backend, error) { ip := backendVal.GetAddress() addrCluster := cmtypes.MustAddrClusterFromIP(ip) port := backendVal.GetPort() - proto := loadbalancer.NONE + proto := loadbalancer.NewL4TypeFromNumber(backendVal.GetProtocol()) state := loadbalancer.GetBackendStateFromFlags(backendVal.GetFlags()) lbBackend := loadbalancer.NewBackendWithState(backendID, proto, addrCluster, port, state) lbBackends = append(lbBackends, lbBackend) @@ -611,11 +609,17 @@ func getBackend(backend *loadbalancer.Backend, ipv6 bool) (Backend, error) { return lbBackend, fmt.Errorf("invalid backend ID 0") } + u8p, err := u8proto.ParseProtocol(backend.Protocol) + if err != nil { + return nil, fmt.Errorf("unable to parse protocol lbBackend (%d, %s, %d, %s, %t): %w", + backend.ID, backend.AddrCluster.String(), backend.Port, backend.Protocol, ipv6, err) + } + if ipv6 { - lbBackend, err = NewBackend6V3(backend.ID, backend.AddrCluster, backend.Port, u8proto.ANY, + lbBackend, err = NewBackend6V3(backend.ID, backend.AddrCluster, backend.Port, u8p, backend.State) } else { - lbBackend, err = NewBackend4V3(backend.ID, backend.AddrCluster, backend.Port, u8proto.ANY, + lbBackend, err = NewBackend4V3(backend.ID, backend.AddrCluster, backend.Port, u8p, backend.State) } if err != nil { diff --git a/pkg/maps/lbmap/types.go b/pkg/maps/lbmap/types.go index 5adfe088b2e5b..3b8adab2a95ad 100644 --- a/pkg/maps/lbmap/types.go +++ b/pkg/maps/lbmap/types.go @@ -11,7 +11,7 @@ import ( "github.com/cilium/cilium/pkg/loadbalancer" ) -// ServiceKey is the interface describing protocol independent key for services map v2. +// ServiceKey is the interface describing key for services map v2. type ServiceKey interface { bpf.MapKey @@ -42,6 +42,9 @@ type ServiceKey interface { // Get frontend port GetPort() uint16 + // Get protocol + GetProtocol() uint8 + // Returns a RevNatValue matching a ServiceKey RevNatValue() RevNatValue @@ -113,7 +116,7 @@ type BackendKey interface { GetID() loadbalancer.BackendID } -// BackendValue is the interface describing protocol independent backend value. +// BackendValue is the interface describing backend value. type BackendValue interface { bpf.MapValue @@ -126,6 +129,9 @@ type BackendValue interface { // Get backend port GetPort() uint16 + // Get backend protocol + GetProtocol() uint8 + // Get backend flags GetFlags() uint8 @@ -177,7 +183,8 @@ type RevNatValue interface { func svcFrontend(svcKey ServiceKey, svcValue ServiceValue) *loadbalancer.L3n4AddrID { feIP := svcKey.GetAddress() feAddrCluster := cmtypes.MustAddrClusterFromIP(feIP) - feL3n4Addr := loadbalancer.NewL3n4Addr(loadbalancer.NONE, feAddrCluster, svcKey.GetPort(), svcKey.GetScope()) + p := loadbalancer.NewL4TypeFromNumber(svcKey.GetProtocol()) + feL3n4Addr := loadbalancer.NewL3n4Addr(p, feAddrCluster, svcKey.GetPort(), svcKey.GetScope()) feL3n4AddrID := &loadbalancer.L3n4AddrID{ L3n4Addr: *feL3n4Addr, ID: loadbalancer.ID(svcValue.GetRevNat()), @@ -189,7 +196,7 @@ func svcBackend(backendID loadbalancer.BackendID, backend BackendValue) *loadbal beIP := backend.GetAddress() beAddrCluster := cmtypes.MustAddrClusterFromIP(beIP) bePort := backend.GetPort() - beProto := loadbalancer.NONE + beProto := loadbalancer.NewL4TypeFromNumber(backend.GetProtocol()) beState := loadbalancer.GetBackendStateFromFlags(backend.GetFlags()) beBackend := loadbalancer.NewBackendWithState(backendID, beProto, beAddrCluster, bePort, beState) return beBackend diff --git a/pkg/service/id_local.go b/pkg/service/id_local.go index 02a5c10b8ef11..6e9c519659b6b 100644 --- a/pkg/service/id_local.go +++ b/pkg/service/id_local.go @@ -51,6 +51,7 @@ func NewIDAllocator(nextID uint32, maxID uint32) *IDAllocator { } } +// addID assumes the lock is held func (alloc *IDAllocator) addID(svc loadbalancer.L3n4Addr, id uint32) *loadbalancer.L3n4AddrID { svcID := newID(svc, id) alloc.entitiesID[id] = svcID diff --git a/pkg/service/service.go b/pkg/service/service.go index b9458b222d4e9..d96d9e8712587 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -4,6 +4,7 @@ package service import ( + "errors" "fmt" "net" "strconv" @@ -32,6 +33,7 @@ import ( nodeTypes "github.com/cilium/cilium/pkg/node/types" "github.com/cilium/cilium/pkg/option" "github.com/cilium/cilium/pkg/service/healthserver" + "github.com/cilium/cilium/pkg/u8proto" ) const anyPort = "*" @@ -818,7 +820,7 @@ func (s *Service) UpdateBackendsState(backends []*lb.Backend) error { } be.State = updatedB.State be.Preferred = updatedB.Preferred - + nextService: for id, info := range s.svcByID { var p *datapathTypes.UpsertServiceParams for i, b := range info.backends { @@ -833,10 +835,17 @@ func (s *Service) UpdateBackendsState(backends []*lb.Backend) error { found := false if p, found = updateSvcs[id]; !found { + proto, err := u8proto.ParseProtocol(info.frontend.L4Addr.Protocol) + if err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to parse service protocol for frontend %+v: %w", info.frontend, err)) + continue nextService + } + p = &datapathTypes.UpsertServiceParams{ ID: uint16(id), IP: info.frontend.L3n4Addr.AddrCluster.AsNetIP(), Port: info.frontend.L3n4Addr.L4Addr.Port, + Protocol: byte(proto), PrevBackendsCount: len(info.backends), IPv6: info.frontend.IsIPv6(), Type: info.svcType, @@ -1357,11 +1366,16 @@ func (s *Service) upsertServiceIntoLBMaps(svc *svcInfo, isExtLocal, isIntLocal b } } svc.svcNatPolicy = natPolicy + protocol, err := u8proto.ParseProtocol(svc.frontend.L3n4Addr.L4Addr.Protocol) + if err != nil { + return err + } p := &datapathTypes.UpsertServiceParams{ ID: uint16(svc.frontend.ID), IP: svc.frontend.L3n4Addr.AddrCluster.AsNetIP(), Port: svc.frontend.L3n4Addr.L4Addr.Port, + Protocol: uint8(protocol), PreferredBackends: preferredBackends, ActiveBackends: activeBackends, NonActiveBackends: nonActiveBackends, diff --git a/pkg/testutils/mockmaps/lbmap.go b/pkg/testutils/mockmaps/lbmap.go index e2e8feeb1b06e..077a4d98e30c5 100644 --- a/pkg/testutils/mockmaps/lbmap.go +++ b/pkg/testutils/mockmaps/lbmap.go @@ -12,6 +12,7 @@ import ( "github.com/cilium/cilium/pkg/ip" lb "github.com/cilium/cilium/pkg/loadbalancer" "github.com/cilium/cilium/pkg/lock" + "github.com/cilium/cilium/pkg/u8proto" ) type LBMockMap struct { @@ -59,7 +60,11 @@ func (m *LBMockMap) UpsertService(p *datapathTypes.UpsertServiceParams) error { } svc, found := m.ServiceByID[p.ID] if !found { - frontend := lb.NewL3n4AddrID(lb.NONE, cmtypes.MustAddrClusterFromIP(p.IP), p.Port, p.Scope, lb.ID(p.ID)) + u8p, err := u8proto.FromNumber(p.Protocol) + if err != nil { + return err + } + frontend := lb.NewL3n4AddrID(u8p.String(), cmtypes.MustAddrClusterFromIP(p.IP), p.Port, p.Scope, lb.ID(p.ID)) svc = &lb.SVC{Frontend: *frontend} } else { if p.PrevBackendsCount != len(svc.Backends) { @@ -137,7 +142,7 @@ func (m *LBMockMap) UpdateBackendWithState(b *lb.Backend) error { if !found { return fmt.Errorf("update failed : backend %d doesn't exist", id) } - if b.ID != be.ID || b.Port != be.Port || !b.AddrCluster.Equal(be.AddrCluster) { + if b.ID != be.ID || b.Port != be.Port || b.Protocol != be.Protocol || !b.AddrCluster.Equal(be.AddrCluster) { return fmt.Errorf("backend in the map %+v doesn't match %+v: only backend"+ "state can be updated", be.String(), b.String()) } diff --git a/pkg/u8proto/u8proto.go b/pkg/u8proto/u8proto.go index 2df035a044173..988d11e077266 100644 --- a/pkg/u8proto/u8proto.go +++ b/pkg/u8proto/u8proto.go @@ -34,6 +34,7 @@ var protoNames = map[U8proto]string{ var ProtoIDs = map[string]U8proto{ "all": 0, "any": 0, + "none": 0, "icmp": 1, "tcp": 6, "udp": 17, @@ -56,3 +57,11 @@ func ParseProtocol(proto string) (U8proto, error) { } return 0, fmt.Errorf("unknown protocol '%s'", proto) } + +func FromNumber(proto uint8) (U8proto, error) { + _, ok := protoNames[U8proto(proto)] + if !ok { + return 0, fmt.Errorf("unknown protocol %d", proto) + } + return U8proto(proto), nil +} From af0b2af29c9aeb13326736ecd75ec300328f18cb Mon Sep 17 00:00:00 2001 From: Gilberto Bertin Date: Mon, 1 Jul 2024 10:52:49 +0200 Subject: [PATCH 2/4] daemon, service: add bpf-lb-proto-diff flag add a new bpf-lb-proto-diff flag that disables the service protocol differentiation logic, allowing to keep the old Cilium behavior where service protocols are not distinguished Signed-off-by: Gilberto Bertin --- Documentation/cmdref/cilium-agent.md | 1 + bpf/bpf_sock.c | 24 +++++++++--- bpf/lib/lb.h | 28 +++++++++++++- daemon/cmd/daemon_main.go | 3 ++ .../cilium/templates/cilium-configmap.yaml | 3 ++ pkg/datapath/linux/config/config.go | 5 +++ pkg/k8s/watchers/watcher.go | 38 +++++++++++++++++++ pkg/option/config.go | 16 ++++++++ 8 files changed, 110 insertions(+), 8 deletions(-) diff --git a/Documentation/cmdref/cilium-agent.md b/Documentation/cmdref/cilium-agent.md index c1a8c1942278a..c8609dde3c208 100644 --- a/Documentation/cmdref/cilium-agent.md +++ b/Documentation/cmdref/cilium-agent.md @@ -47,6 +47,7 @@ cilium-agent [flags] --bpf-lb-maglev-table-size uint Maglev per service backend table size (parameter M) (default 16381) --bpf-lb-map-max int Maximum number of entries in Cilium BPF lbmap (default 65536) --bpf-lb-mode string BPF load balancing mode ("snat", "dsr", "hybrid") (default "snat") + --bpf-lb-proto-diff Enable support for service protocol differentiation (TCP, UDP, SCTP) (default true) --bpf-lb-rss-ipv4-src-cidr string BPF load balancing RSS outer source IPv4 CIDR prefix for IPIP --bpf-lb-rss-ipv6-src-cidr string BPF load balancing RSS outer source IPv6 CIDR prefix for IPIP --bpf-lb-sock Enable socket-based LB for E/W traffic diff --git a/bpf/bpf_sock.c b/bpf/bpf_sock.c index 9adf768e4cb1e..e84e28307bd3b 100644 --- a/bpf/bpf_sock.c +++ b/bpf/bpf_sock.c @@ -302,7 +302,9 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, struct lb4_key key = { .address = dst_ip, .dport = dst_port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) .proto = protocol, +#endif }, orig_key = key; struct lb4_service *backend_slot; bool backend_from_affinity = false; @@ -326,7 +328,7 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, /* Restore the original key's protocol as lb4_lookup_service * has overwritten it. */ - key.proto = protocol; + lb4_key_set_protocol(&key, protocol); svc = sock4_wildcard_lookup_full(&key, in_hostns); } if (!svc) @@ -492,7 +494,9 @@ static __always_inline int __sock4_post_bind(struct bpf_sock *ctx, struct lb4_key key = { .address = ctx->src_ip4, .dport = ctx_src_port(ctx), +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) .proto = protocol, +#endif }; if (!sock_proto_enabled(protocol) || @@ -508,7 +512,7 @@ static __always_inline int __sock4_post_bind(struct bpf_sock *ctx, * Restore the original key's protocol as lb4_lookup_service * has overwritten it. */ - key.proto = protocol; + lb4_key_set_protocol(&key, protocol); svc = sock4_wildcard_lookup(&key, false, false, true); } @@ -606,7 +610,9 @@ static __always_inline int __sock4_xlate_rev(struct bpf_sock_addr *ctx, struct lb4_key svc_key = { .address = val->address, .dport = val->port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) .proto = protocol, +#endif }; svc = lb4_lookup_service(&svc_key, true, false); @@ -614,7 +620,7 @@ static __always_inline int __sock4_xlate_rev(struct bpf_sock_addr *ctx, /* Restore the original key's protocol as lb4_lookup_service * has overwritten it. */ - svc_key.proto = protocol; + lb4_key_set_protocol(&svc_key, protocol); svc = sock4_wildcard_lookup_full(&svc_key, ctx_in_hostns(ctx_full, NULL)); } @@ -879,7 +885,9 @@ static __always_inline int __sock6_post_bind(struct bpf_sock *ctx) struct lb6_service *svc; struct lb6_key key = { .dport = ctx_src_port(ctx), +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) .proto = protocol, +#endif }; if (!sock_proto_enabled(protocol) || @@ -893,7 +901,7 @@ static __always_inline int __sock6_post_bind(struct bpf_sock *ctx) /* Restore the original key's protocol as lb6_lookup_service * has overwritten it. */ - key.proto = protocol; + lb6_key_set_protocol(&key, protocol); svc = sock6_wildcard_lookup(&key, false, false, true); if (!svc) return sock6_post_bind_v4_in_v6(ctx); @@ -1011,7 +1019,9 @@ static __always_inline int __sock6_xlate_fwd(struct bpf_sock_addr *ctx, __u8 protocol = ctx_protocol(ctx); struct lb6_key key = { .dport = dst_port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) .proto = protocol, +#endif }, orig_key; struct lb6_service *backend_slot; bool backend_from_affinity = false; @@ -1034,7 +1044,7 @@ static __always_inline int __sock6_xlate_fwd(struct bpf_sock_addr *ctx, /* Restore the original key's protocol as lb6_lookup_service * has overwritten it. */ - key.proto = protocol; + lb6_key_set_protocol(&key, protocol); svc = sock6_wildcard_lookup_full(&key, in_hostns); } if (!svc) @@ -1219,7 +1229,9 @@ static __always_inline int __sock6_xlate_rev(struct bpf_sock_addr *ctx) struct lb6_key svc_key = { .address = val->address, .dport = val->port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) .proto = protocol, +#endif }; svc = lb6_lookup_service(&svc_key, true, false); @@ -1227,7 +1239,7 @@ static __always_inline int __sock6_xlate_rev(struct bpf_sock_addr *ctx) /* Restore the original key's protocol as lb6_lookup_service * has overwritten it. */ - svc_key.proto = protocol; + lb6_key_set_protocol(&svc_key, protocol); svc = sock6_wildcard_lookup_full(&svc_key, ctx_in_hostns(ctx, NULL)); } diff --git a/bpf/lib/lb.h b/bpf/lib/lb.h index e1a132ad132a0..9e6e51c1aafec 100644 --- a/bpf/lib/lb.h +++ b/bpf/lib/lb.h @@ -487,10 +487,19 @@ static __always_inline int lb6_rev_nat(struct __ctx_buff *ctx, int l4_off, return __lb6_rev_nat(ctx, l4_off, tuple, flags, nat); } +static __always_inline void +lb6_key_set_protocol(struct lb6_key *key __maybe_unused, + __u8 protocol __maybe_unused) +{ +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + key->proto = protocol; +#endif +} + static __always_inline void lb6_fill_key(struct lb6_key *key, struct ipv6_ct_tuple *tuple) { - key->proto = tuple->nexthdr; + lb6_key_set_protocol(key, tuple->nexthdr); ipv6_addr_copy(&key->address, &tuple->daddr); key->dport = tuple->sport; } @@ -1021,6 +1030,12 @@ struct lb6_service *lb6_lookup_service(struct lb6_key *key __maybe_unused, return NULL; } +static __always_inline void +lb6_key_set_protocol(struct lb6_key *key __maybe_unused, + __u8 protocol __maybe_unused) +{ +} + static __always_inline struct lb6_service *__lb6_lookup_backend_slot(struct lb6_key *key __maybe_unused) { @@ -1144,10 +1159,19 @@ static __always_inline int lb4_rev_nat(struct __ctx_buff *ctx, int l3_off, int l loopback, has_l4_header); } +static __always_inline void +lb4_key_set_protocol(struct lb4_key *key __maybe_unused, + __u8 protocol __maybe_unused) +{ +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + key->proto = protocol; +#endif +} + static __always_inline void lb4_fill_key(struct lb4_key *key, const struct ipv4_ct_tuple *tuple) { - key->proto = tuple->nexthdr; + lb4_key_set_protocol(key, tuple->nexthdr); key->address = tuple->daddr; /* CT tuple has ports in reverse order: */ key->dport = tuple->sport; diff --git a/daemon/cmd/daemon_main.go b/daemon/cmd/daemon_main.go index 02bd28dd231fb..a638162078f05 100644 --- a/daemon/cmd/daemon_main.go +++ b/daemon/cmd/daemon_main.go @@ -619,6 +619,9 @@ func initializeFlags() { option.NodePortAccelerationNative, option.NodePortAccelerationDisabled)) option.BindEnv(Vp, option.LoadBalancerAcceleration) + flags.Bool(option.LoadBalancerProtocolDifferentiation, true, "Enable support for service protocol differentiation (TCP, UDP, SCTP)") + option.BindEnv(Vp, option.LoadBalancerProtocolDifferentiation) + flags.Uint(option.MaglevTableSize, maglev.DefaultTableSize, "Maglev per service backend table size (parameter M)") option.BindEnv(Vp, option.MaglevTableSize) diff --git a/install/kubernetes/cilium/templates/cilium-configmap.yaml b/install/kubernetes/cilium/templates/cilium-configmap.yaml index f6de6f42ef5b6..82a0082c56f42 100644 --- a/install/kubernetes/cilium/templates/cilium-configmap.yaml +++ b/install/kubernetes/cilium/templates/cilium-configmap.yaml @@ -724,6 +724,9 @@ data: {{- if hasKey .Values.loadBalancer "serviceTopology" }} enable-service-topology: {{ .Values.loadBalancer.serviceTopology | quote }} {{- end }} +{{- if hasKey .Values.loadBalancer "protocolDifferentiation" }} + bpf-lb-proto-diff: {{ .Values.loadBalancer.protocolDifferentiation.enabled | quote }} +{{- end }} {{- end }} {{- if hasKey .Values.maglev "tableSize" }} diff --git a/pkg/datapath/linux/config/config.go b/pkg/datapath/linux/config/config.go index 333076b2ff3cb..e432942a0e098 100644 --- a/pkg/datapath/linux/config/config.go +++ b/pkg/datapath/linux/config/config.go @@ -464,6 +464,7 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC cDefinesMap["IPV6_RSS_PREFIX_BITS"] = "128" } } + if option.Config.NodePortAcceleration != option.NodePortAccelerationDisabled { cDefinesMap["ENABLE_NODEPORT_ACCELERATION"] = "1" } @@ -699,6 +700,10 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC } cDefinesMap["EPHEMERAL_MIN"] = fmt.Sprintf("%d", ephemeralMin) + if option.Config.LoadBalancerProtocolDifferentiation { + cDefinesMap["ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION"] = "1" + } + // Since golang maps are unordered, we sort the keys in the map // to get a consistent written format to the writer. This maintains // the consistency when we try to calculate hash for a datapath after diff --git a/pkg/k8s/watchers/watcher.go b/pkg/k8s/watchers/watcher.go index 288411c257a97..3af9d03855d54 100644 --- a/pkg/k8s/watchers/watcher.go +++ b/pkg/k8s/watchers/watcher.go @@ -930,6 +930,38 @@ func hashSVCMap(svcs []loadbalancer.SVC) map[string]loadbalancer.L3n4Addr { return m } +func stripServiceProtocol(svc *k8s.Service) *k8s.Service { + if svc == nil { + return nil + } + + svc = svc.DeepCopy() + + for _, port := range svc.Ports { + port.Protocol = "NONE" + } + + for _, nodePort := range svc.NodePorts { + for _, port := range nodePort { + port.Protocol = "NONE" + } + } + + return svc +} + +func stripEndpointsProtocol(endpoints *k8s.Endpoints) *k8s.Endpoints { + endpoints = endpoints.DeepCopy() + + for _, backend := range endpoints.Backends { + for _, port := range backend.Ports { + port.Protocol = "NONE" + } + } + + return endpoints +} + func (k *K8sWatcher) addK8sSVCs(svcID k8s.ServiceID, oldSvc, svc *k8s.Service, endpoints *k8s.Endpoints) error { // Headless services do not need any datapath implementation if svc.IsHeadless { @@ -941,6 +973,12 @@ func (k *K8sWatcher) addK8sSVCs(svcID k8s.ServiceID, oldSvc, svc *k8s.Service, e logfields.K8sNamespace: svcID.Namespace, }) + if !option.Config.LoadBalancerProtocolDifferentiation { + oldSvc = stripServiceProtocol(oldSvc) + svc = stripServiceProtocol(svc) + endpoints = stripEndpointsProtocol(endpoints) + } + svcs := datapathSVCs(svc, endpoints) svcMap := hashSVCMap(svcs) diff --git a/pkg/option/config.go b/pkg/option/config.go index e891a60a3da09..b506c03c0d6aa 100644 --- a/pkg/option/config.go +++ b/pkg/option/config.go @@ -285,6 +285,13 @@ const ( // Alias to NodePortAcceleration LoadBalancerAcceleration = "bpf-lb-acceleration" + // LoadBalancerExternalControlPlane switch skips connectivity to kube-apiserver + // which is relevant in lb-only mode + LoadBalancerExternalControlPlane = "bpf-lb-external-control-plane" + + // LoadBalancerProtocolDifferentiation enables support for service protocol differentiation (TCP, UDP, SCTP) + LoadBalancerProtocolDifferentiation = "bpf-lb-proto-diff" + // MaglevTableSize determines the size of the backend table per service MaglevTableSize = "bpf-lb-maglev-table-size" @@ -2058,6 +2065,13 @@ type DaemonConfig struct { LoadBalancerRSSv6CIDR string LoadBalancerRSSv6 net.IPNet + // LoadBalancerExternalControlPlane tells whether to not use kube-apiserver as + // its control plane in lb-only mode. + LoadBalancerExternalControlPlane bool + + // LoadBalancerProtocolDifferentiation enables support for service protocol differentiation (TCP, UDP, SCTP) + LoadBalancerProtocolDifferentiation bool + // EnablePMTUDiscovery indicates whether to send ICMP fragmentation-needed // replies to the client (when needed). EnablePMTUDiscovery bool @@ -3590,6 +3604,8 @@ func (c *DaemonConfig) Populate(vp *viper.Viper) { // To support K8s NetworkPolicy c.EnableK8sNetworkPolicy = vp.GetBool(EnableK8sNetworkPolicy) + + c.LoadBalancerProtocolDifferentiation = vp.GetBool(LoadBalancerProtocolDifferentiation) } func (c *DaemonConfig) populateDevices(vp *viper.Viper) { From 1953d9c4aa303c783d545104a1ce8d56a1aa39f5 Mon Sep 17 00:00:00 2001 From: Gilberto Bertin Date: Fri, 28 Jun 2024 16:04:49 +0200 Subject: [PATCH 3/4] service: don't overwrite existing services with NONE protocol when upgrading to a version of Cilium that supports and has service protocol differentiation enabled, existing services without protocol set will be overwritten (or better, restored from the bpf maps, deleted because they are orphan and the recreated) with their protocol set. As this causes connection disruptions, keep those services without protocol until the user explicitly deletes them. To allow both type of services to co-exist, this commit: - introduces a second service lookup in the datapath, without the protocol set, in case the protocol lookup fails - updates the service allocation logic to check both for services with and without protocol before allocating a new one - updates the deletion logic to look for both with and without protocol versions, to ensure all service versions are proper deleted Co-authored-by: Antonio Ojea Signed-off-by: Gilberto Bertin --- bpf/lib/lb.h | 18 ++++++++ pkg/k8s/watchers/watcher.go | 77 +++++++++++++++++--------------- pkg/loadbalancer/loadbalancer.go | 2 + pkg/service/service.go | 28 ++++++++++++ 4 files changed, 90 insertions(+), 35 deletions(-) diff --git a/bpf/lib/lb.h b/bpf/lib/lb.h index 9e6e51c1aafec..bd276a001d96f 100644 --- a/bpf/lib/lb.h +++ b/bpf/lib/lb.h @@ -594,6 +594,15 @@ struct lb6_service *lb6_lookup_service(struct lb6_key *key, key->scope = LB_LOOKUP_SCOPE_EXT; key->backend_slot = 0; svc = map_lookup_elem(&LB6_SERVICES_MAP_V2, key); + +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + /* If there are no elements for a specific protocol, check for ANY entries. */ + if (!svc && key->proto != 0) { + key->proto = 0; + svc = map_lookup_elem(&LB6_SERVICES_MAP_V2, key); + } +#endif + if (svc) { if (!scope_switch || !lb6_svc_is_two_scopes(svc)) /* Packets for L7 LB are redirected even when there are no backends. */ @@ -1270,6 +1279,15 @@ struct lb4_service *lb4_lookup_service(struct lb4_key *key, key->scope = LB_LOOKUP_SCOPE_EXT; key->backend_slot = 0; svc = map_lookup_elem(&LB4_SERVICES_MAP_V2, key); + +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + /* If there are no elements for a specific protocol, check for ANY entries. */ + if (!svc && key->proto != 0) { + key->proto = 0; + svc = map_lookup_elem(&LB4_SERVICES_MAP_V2, key); + } +#endif + if (svc) { if (!scope_switch || !lb4_svc_is_two_scopes(svc)) /* Packets for L7 LB are redirected even when there are no backends. */ diff --git a/pkg/k8s/watchers/watcher.go b/pkg/k8s/watchers/watcher.go index 3af9d03855d54..70f794e7c3c8a 100644 --- a/pkg/k8s/watchers/watcher.go +++ b/pkg/k8s/watchers/watcher.go @@ -723,53 +723,60 @@ func (k *K8sWatcher) delK8sSVCs(svc k8s.ServiceID, svcInfo *k8s.Service, se *k8s logfields.K8sNamespace: svc.Namespace, }) - repPorts := svcInfo.UniquePorts() + for _, checkProtocol := range []bool{true, false} { + if !checkProtocol { + svcInfo = stripServiceProtocol(svcInfo) + } - frontends := []*loadbalancer.L3n4Addr{} + repPorts := svcInfo.UniquePorts() - for portName, svcPort := range svcInfo.Ports { - if !repPorts[svcPort.String()] { - continue - } - repPorts[svcPort.String()] = false + frontends := []*loadbalancer.L3n4Addr{} - for _, feIP := range svcInfo.FrontendIPs { - fe := loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(feIP), svcPort.Port, loadbalancer.ScopeExternal) - frontends = append(frontends, fe) - } + for portName, svcPort := range svcInfo.Ports { + if !repPorts[svcPort.String()] { + continue + } + repPorts[svcPort.String()] = false - for _, nodePortFE := range svcInfo.NodePorts[portName] { - frontends = append(frontends, &nodePortFE.L3n4Addr) - if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { - cpFE := nodePortFE.L3n4Addr.DeepCopy() - cpFE.Scope = loadbalancer.ScopeInternal - frontends = append(frontends, cpFE) + for _, feIP := range svcInfo.FrontendIPs { + fe := loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(feIP), svcPort.Port, loadbalancer.ScopeExternal) + frontends = append(frontends, fe) } - } - for _, k8sExternalIP := range svcInfo.K8sExternalIPs { - frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(k8sExternalIP), svcPort.Port, loadbalancer.ScopeExternal)) - } + for _, nodePortFE := range svcInfo.NodePorts[portName] { + frontends = append(frontends, &nodePortFE.L3n4Addr) + if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { + cpFE := nodePortFE.L3n4Addr.DeepCopy() + cpFE.Scope = loadbalancer.ScopeInternal + frontends = append(frontends, cpFE) + } + } + + for _, k8sExternalIP := range svcInfo.K8sExternalIPs { + frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(k8sExternalIP), svcPort.Port, loadbalancer.ScopeExternal)) + } - for _, ip := range svcInfo.LoadBalancerIPs { - addrCluster := cmtypes.MustAddrClusterFromIP(ip) - frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeExternal)) - if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { - frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeInternal)) + for _, ip := range svcInfo.LoadBalancerIPs { + addrCluster := cmtypes.MustAddrClusterFromIP(ip) + frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeExternal)) + if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { + frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeInternal)) + } } } - } - for _, fe := range frontends { - if found, err := k.svcManager.DeleteService(*fe); err != nil { - scopedLog.WithError(err).WithField(logfields.Object, logfields.Repr(fe)). - Warn("Error deleting service by frontend") - } else if !found { - scopedLog.WithField(logfields.Object, logfields.Repr(fe)).Warn("service not found") - } else { - scopedLog.Debugf("# cilium lb delete-service %s %d 0", fe.AddrCluster.String(), fe.Port) + for _, fe := range frontends { + if found, err := k.svcManager.DeleteService(*fe); err != nil { + scopedLog.WithError(err).WithField(logfields.Object, logfields.Repr(fe)). + Warn("Error deleting service by frontend") + } else if !found { + scopedLog.WithField(logfields.Object, logfields.Repr(fe)).Warn("service not found") + } else { + scopedLog.Debugf("# cilium lb delete-service %s %d 0", fe.AddrCluster.String(), fe.Port) + } } } + return nil } diff --git a/pkg/loadbalancer/loadbalancer.go b/pkg/loadbalancer/loadbalancer.go index 1cc49ce8fc5b7..b34e4c90f10db 100644 --- a/pkg/loadbalancer/loadbalancer.go +++ b/pkg/loadbalancer/loadbalancer.go @@ -526,6 +526,8 @@ func IsValidBackendState(state string) bool { func NewL4Type(name string) (L4Type, error) { switch strings.ToLower(name) { + case "none": + return NONE, nil case "tcp": return TCP, nil case "udp": diff --git a/pkg/service/service.go b/pkg/service/service.go index d96d9e8712587..b0b552bd65904 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -1151,8 +1151,36 @@ func (s *Service) createSVCInfoIfNotExist(p *lb.SVC) (*svcInfo, bool, bool, prevSessionAffinity := false prevLoadBalancerSourceRanges := []*cidr.CIDR{} + // when Cilium is upgraded to a version that supports service protocol differentiation, and such feature is + // enabled, we may end up in a situation where some existing services do not have the protocol set. + // + // As in such cases we want to preserve the existing services (in order to not break existing connections to + // those services), when trying to create a new one check first if an "old" service without the protocol + // already exists, by overwriting its protocol to NONE. + // If it doesn't then do a second lookup in the svcByHash map with the protocol set. + // + // Note that this logic can be removed once we stop supporting services without protocol. + proto := p.Frontend.L3n4Addr.L4Addr.Protocol + p.Frontend.L3n4Addr.L4Addr.Protocol = "NONE" + + backendProtos := []lb.L4Type{} + for _, backend := range p.Backends { + backendProtos = append(backendProtos, backend.L3n4Addr.L4Addr.Protocol) + backend.L3n4Addr.L4Addr.Protocol = "NONE" + } + hash := p.Frontend.Hash() svc, found := s.svcByHash[hash] + if !found { + p.Frontend.L3n4Addr.L4Addr.Protocol = proto + for i, backend := range p.Backends { + backend.L3n4Addr.L4Addr.Protocol = backendProtos[i] + } + + hash = p.Frontend.Hash() + svc, found = s.svcByHash[hash] + } + if !found { // Allocate service ID for the new service addrID, err := AcquireID(p.Frontend.L3n4Addr, uint32(p.Frontend.ID)) From 68fd269396f2530dab03a82e4988af1e1cd78f52 Mon Sep 17 00:00:00 2001 From: Gilberto Bertin Date: Wed, 3 Jul 2024 11:57:56 +0200 Subject: [PATCH 4/4] service: use protocol ANY when not differentiating TCP and UDP Signed-off-by: Gilberto Bertin --- pkg/k8s/watchers/watcher.go | 6 +++--- pkg/loadbalancer/loadbalancer.go | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/k8s/watchers/watcher.go b/pkg/k8s/watchers/watcher.go index 70f794e7c3c8a..ad54f984fb476 100644 --- a/pkg/k8s/watchers/watcher.go +++ b/pkg/k8s/watchers/watcher.go @@ -945,12 +945,12 @@ func stripServiceProtocol(svc *k8s.Service) *k8s.Service { svc = svc.DeepCopy() for _, port := range svc.Ports { - port.Protocol = "NONE" + port.Protocol = "ANY" } for _, nodePort := range svc.NodePorts { for _, port := range nodePort { - port.Protocol = "NONE" + port.Protocol = "ANY" } } @@ -962,7 +962,7 @@ func stripEndpointsProtocol(endpoints *k8s.Endpoints) *k8s.Endpoints { for _, backend := range endpoints.Backends { for _, port := range backend.Ports { - port.Protocol = "NONE" + port.Protocol = "ANY" } } diff --git a/pkg/loadbalancer/loadbalancer.go b/pkg/loadbalancer/loadbalancer.go index b34e4c90f10db..ead2ef528c1a0 100644 --- a/pkg/loadbalancer/loadbalancer.go +++ b/pkg/loadbalancer/loadbalancer.go @@ -232,6 +232,8 @@ func (s ServiceFlags) UInt16() uint16 { const ( // NONE type. NONE = L4Type("NONE") + // ANY type. + ANY = L4Type("ANY") // TCP type. TCP = L4Type("TCP") // UDP type. @@ -528,6 +530,8 @@ func NewL4Type(name string) (L4Type, error) { switch strings.ToLower(name) { case "none": return NONE, nil + case "any": + return ANY, nil case "tcp": return TCP, nil case "udp": @@ -548,7 +552,7 @@ func NewL4TypeFromNumber(proto uint8) L4Type { case 132: return SCTP default: - return NONE + return ANY } }