diff --git a/Documentation/cmdref/cilium-agent.md b/Documentation/cmdref/cilium-agent.md index c1a8c1942278a..c8609dde3c208 100644 --- a/Documentation/cmdref/cilium-agent.md +++ b/Documentation/cmdref/cilium-agent.md @@ -47,6 +47,7 @@ cilium-agent [flags] --bpf-lb-maglev-table-size uint Maglev per service backend table size (parameter M) (default 16381) --bpf-lb-map-max int Maximum number of entries in Cilium BPF lbmap (default 65536) --bpf-lb-mode string BPF load balancing mode ("snat", "dsr", "hybrid") (default "snat") + --bpf-lb-proto-diff Enable support for service protocol differentiation (TCP, UDP, SCTP) (default true) --bpf-lb-rss-ipv4-src-cidr string BPF load balancing RSS outer source IPv4 CIDR prefix for IPIP --bpf-lb-rss-ipv6-src-cidr string BPF load balancing RSS outer source IPv6 CIDR prefix for IPIP --bpf-lb-sock Enable socket-based LB for E/W traffic diff --git a/Documentation/cmdref/cilium_service_update.md b/Documentation/cmdref/cilium_service_update.md index 4a114be137102..fddc5f4fec33e 100644 --- a/Documentation/cmdref/cilium_service_update.md +++ b/Documentation/cmdref/cilium_service_update.md @@ -24,6 +24,7 @@ cilium service update [flags] --k8s-load-balancer Set service as a k8s LoadBalancer --k8s-node-port Set service as a k8s NodePort --local-redirect Set service as Local Redirect + --protocol string Protocol for service (e.g. TCP, UDP) (default "tcp") --states strings Backend state(s) as {active(default),terminating,quarantined,maintenance} ``` diff --git a/Documentation/network/kubernetes/kubeproxy-free.rst b/Documentation/network/kubernetes/kubeproxy-free.rst index 210c8ae78830a..30d305b09e5dd 100644 --- a/Documentation/network/kubernetes/kubeproxy-free.rst +++ b/Documentation/network/kubernetes/kubeproxy-free.rst @@ -242,14 +242,14 @@ In this example, services with port ``31940`` were created (one for each of devi $ kubectl -n kube-system exec ds/cilium -- cilium service list ID Frontend Service Type Backend [...] - 4 10.104.239.135:80 ClusterIP 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 - 5 0.0.0.0:31940 NodePort 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 - 6 192.168.178.29:31940 NodePort 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 - 7 172.16.0.29:31940 NodePort 1 => 10.217.0.107:80 - 2 => 10.217.0.149:80 + 4 10.104.239.135:80/TCP ClusterIP 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP + 5 0.0.0.0:31940/TCP NodePort 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP + 6 192.168.178.29:31940/TCP NodePort 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP + 7 172.16.0.29:31940/TCP NodePort 1 => 10.217.0.107:80/TCP + 2 => 10.217.0.149:80/TCP Create a variable with the node port for testing: diff --git a/api/v1/models/backend_address.go b/api/v1/models/backend_address.go index 3f17b81f49175..f02578a08fe64 100644 --- a/api/v1/models/backend_address.go +++ b/api/v1/models/backend_address.go @@ -37,6 +37,9 @@ type BackendAddress struct { // on related annotation of global service. Applicable for active state only. Preferred bool `json:"preferred,omitempty"` + // Layer 4 protocol (TCP, UDP, etc) + Protocol string `json:"protocol,omitempty"` + // State of the backend for load-balancing service traffic // Enum: [active terminating quarantined maintenance] State string `json:"state,omitempty"` diff --git a/api/v1/openapi.yaml b/api/v1/openapi.yaml index db842956c06b4..d94e928fc65fd 100644 --- a/api/v1/openapi.yaml +++ b/api/v1/openapi.yaml @@ -2863,6 +2863,9 @@ definitions: ip: description: Layer 3 address type: string + protocol: + description: Layer 4 protocol (TCP, UDP, etc) + type: string port: description: Layer 4 port number type: integer diff --git a/api/v1/server/embedded_spec.go b/api/v1/server/embedded_spec.go index 19bf18240bdac..2c77f267c64ab 100644 --- a/api/v1/server/embedded_spec.go +++ b/api/v1/server/embedded_spec.go @@ -1874,6 +1874,10 @@ func init() { "description": "Indicator if this backend is preferred in the context of clustermesh service affinity. The value is set based\non related annotation of global service. Applicable for active state only.", "type": "boolean" }, + "protocol": { + "description": "Layer 4 protocol (TCP, UDP, etc)", + "type": "string" + }, "state": { "description": "State of the backend for load-balancing service traffic", "type": "string", @@ -7254,6 +7258,10 @@ func init() { "description": "Indicator if this backend is preferred in the context of clustermesh service affinity. The value is set based\non related annotation of global service. Applicable for active state only.", "type": "boolean" }, + "protocol": { + "description": "Layer 4 protocol (TCP, UDP, etc)", + "type": "string" + }, "state": { "description": "State of the backend for load-balancing service traffic", "type": "string", diff --git a/bpf/bpf_sock.c b/bpf/bpf_sock.c index 0cf21bd9e8078..e84e28307bd3b 100644 --- a/bpf/bpf_sock.c +++ b/bpf/bpf_sock.c @@ -39,6 +39,9 @@ static __always_inline __maybe_unused bool is_v6_loopback(const union v6addr *da return ipv6_addr_equals(&loopback, daddr); } +/* Hack due to missing narrow ctx access. */ +#define ctx_protocol(__ctx) ((__u8)(volatile __u32)(__ctx)->protocol) + /* Hack due to missing narrow ctx access. */ static __always_inline __maybe_unused __be16 ctx_dst_port(const struct bpf_sock_addr *ctx) @@ -104,12 +107,12 @@ bool sock_is_health_check(struct bpf_sock_addr *ctx __maybe_unused) static __always_inline __maybe_unused __u64 sock_select_slot(struct bpf_sock_addr *ctx) { - return ctx->protocol == IPPROTO_TCP ? + return ctx_protocol(ctx) == IPPROTO_TCP ? get_prandom_u32() : sock_local_cookie(ctx); } static __always_inline __maybe_unused -bool sock_proto_enabled(__u32 proto) +bool sock_proto_enabled(__u8 proto) { switch (proto) { case IPPROTO_TCP: @@ -294,10 +297,14 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, struct lb4_backend *backend; struct lb4_service *svc; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); __u32 dst_ip = ctx->user_ip4; struct lb4_key key = { .address = dst_ip, .dport = dst_port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + .proto = protocol, +#endif }, orig_key = key; struct lb4_service *backend_slot; bool backend_from_affinity = false; @@ -309,7 +316,7 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, if (is_defined(ENABLE_SOCKET_LB_HOST_ONLY) && !in_hostns) return -ENXIO; - if (!udp_only && !sock_proto_enabled(ctx->protocol)) + if (!udp_only && !sock_proto_enabled(protocol)) return -ENOTSUP; /* In case a direct match fails, we try to look-up surrogate @@ -317,8 +324,13 @@ static __always_inline int __sock4_xlate_fwd(struct bpf_sock_addr *ctx, * HostPort services. */ svc = lb4_lookup_service(&key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb4_lookup_service + * has overwritten it. + */ + lb4_key_set_protocol(&key, protocol); svc = sock4_wildcard_lookup_full(&key, in_hostns); + } if (!svc) return -ENXIO; if (svc->count == 0 && !lb4_svc_is_l7loadbalancer(svc)) @@ -477,23 +489,32 @@ int cil_sock4_connect(struct bpf_sock_addr *ctx) static __always_inline int __sock4_post_bind(struct bpf_sock *ctx, struct bpf_sock *ctx_full) { + __u8 protocol = ctx_protocol(ctx); struct lb4_service *svc; struct lb4_key key = { .address = ctx->src_ip4, .dport = ctx_src_port(ctx), +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + .proto = protocol, +#endif }; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(protocol) || !ctx_in_hostns(ctx_full, NULL)) return 0; svc = lb4_lookup_service(&key, true, false); - if (!svc) + if (!svc) { /* Perform a wildcard lookup for the case where the caller * tries to bind to loopback or an address with host identity * (without remote hosts). + * + * Restore the original key's protocol as lb4_lookup_service + * has overwritten it. */ + lb4_key_set_protocol(&key, protocol); svc = sock4_wildcard_lookup(&key, false, false, true); + } /* If the sockaddr of this socket overlaps with a NodePort, * LoadBalancer or ExternalIP service. We must reject this @@ -540,7 +561,7 @@ static __always_inline int __sock4_pre_bind(struct bpf_sock_addr *ctx, .peer = { .address = ctx->user_ip4, .port = ctx_dst_port(ctx), - .proto = (__u8)ctx->protocol, + .proto = ctx_protocol(ctx), }, }; int ret; @@ -556,7 +577,7 @@ int cil_sock4_pre_bind(struct bpf_sock_addr *ctx) { int ret = SYS_PROCEED; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(ctx_protocol(ctx)) || !ctx_in_hostns(ctx, NULL)) return ret; if (sock_is_health_check(ctx) && @@ -573,6 +594,7 @@ static __always_inline int __sock4_xlate_rev(struct bpf_sock_addr *ctx, { struct ipv4_revnat_entry *val; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); __u32 dst_ip = ctx->user_ip4; struct ipv4_revnat_tuple key = { .cookie = sock_local_cookie(ctx_full), @@ -588,12 +610,20 @@ static __always_inline int __sock4_xlate_rev(struct bpf_sock_addr *ctx, struct lb4_key svc_key = { .address = val->address, .dport = val->port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + .proto = protocol, +#endif }; svc = lb4_lookup_service(&svc_key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb4_lookup_service + * has overwritten it. + */ + lb4_key_set_protocol(&svc_key, protocol); svc = sock4_wildcard_lookup_full(&svc_key, ctx_in_hostns(ctx_full, NULL)); + } if (!svc || svc->rev_nat_index != val->rev_nat_index || (svc->count == 0 && !lb4_svc_is_l7loadbalancer(svc))) { map_delete_elem(&LB4_REVERSE_NAT_SK_MAP, &key); @@ -810,7 +840,7 @@ int sock6_xlate_v4_in_v6(struct bpf_sock_addr *ctx __maybe_unused, return -ENXIO; memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.user_ip4 = addr6.p4; fake_ctx.user_port = ctx_dst_port(ctx); @@ -840,7 +870,7 @@ sock6_post_bind_v4_in_v6(struct bpf_sock *ctx __maybe_unused) return 0; memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.src_ip4 = addr6.p4; fake_ctx.src_port = ctx->src_port; @@ -851,12 +881,16 @@ sock6_post_bind_v4_in_v6(struct bpf_sock *ctx __maybe_unused) static __always_inline int __sock6_post_bind(struct bpf_sock *ctx) { + __u8 protocol = ctx_protocol(ctx); struct lb6_service *svc; struct lb6_key key = { .dport = ctx_src_port(ctx), +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + .proto = protocol, +#endif }; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(protocol) || !ctx_in_hostns(ctx, NULL)) return 0; @@ -864,6 +898,10 @@ static __always_inline int __sock6_post_bind(struct bpf_sock *ctx) svc = lb6_lookup_service(&key, true, false); if (!svc) { + /* Restore the original key's protocol as lb6_lookup_service + * has overwritten it. + */ + lb6_key_set_protocol(&key, protocol); svc = sock6_wildcard_lookup(&key, false, false, true); if (!svc) return sock6_post_bind_v4_in_v6(ctx); @@ -904,7 +942,7 @@ sock6_pre_bind_v4_in_v6(struct bpf_sock_addr *ctx __maybe_unused) ctx_get_v6_address(ctx, &addr6); memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.user_ip4 = addr6.p4; fake_ctx.user_port = ctx_dst_port(ctx); @@ -935,7 +973,7 @@ static __always_inline int __sock6_pre_bind(struct bpf_sock_addr *ctx) struct lb6_health val = { .peer = { .port = ctx_dst_port(ctx), - .proto = (__u8)ctx->protocol, + .proto = ctx_protocol(ctx), }, }; int ret = 0; @@ -957,7 +995,7 @@ int cil_sock6_pre_bind(struct bpf_sock_addr *ctx) { int ret = SYS_PROCEED; - if (!sock_proto_enabled(ctx->protocol) || + if (!sock_proto_enabled(ctx_protocol(ctx)) || !ctx_in_hostns(ctx, NULL)) return ret; if (sock_is_health_check(ctx) && @@ -978,8 +1016,12 @@ static __always_inline int __sock6_xlate_fwd(struct bpf_sock_addr *ctx, struct lb6_backend *backend; struct lb6_service *svc; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); struct lb6_key key = { .dport = dst_port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + .proto = protocol, +#endif }, orig_key; struct lb6_service *backend_slot; bool backend_from_affinity = false; @@ -991,15 +1033,20 @@ static __always_inline int __sock6_xlate_fwd(struct bpf_sock_addr *ctx, if (is_defined(ENABLE_SOCKET_LB_HOST_ONLY) && !in_hostns) return -ENXIO; - if (!udp_only && !sock_proto_enabled(ctx->protocol)) + if (!udp_only && !sock_proto_enabled(protocol)) return -ENOTSUP; ctx_get_v6_address(ctx, &key.address); memcpy(&orig_key, &key, sizeof(key)); svc = lb6_lookup_service(&key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb6_lookup_service + * has overwritten it. + */ + lb6_key_set_protocol(&key, protocol); svc = sock6_wildcard_lookup_full(&key, in_hostns); + } if (!svc) return sock6_xlate_v4_in_v6(ctx, udp_only); if (svc->count == 0 && !lb6_svc_is_l7loadbalancer(svc)) @@ -1144,7 +1191,7 @@ sock6_xlate_rev_v4_in_v6(struct bpf_sock_addr *ctx __maybe_unused) return -ENXIO; memset(&fake_ctx, 0, sizeof(fake_ctx)); - fake_ctx.protocol = ctx->protocol; + fake_ctx.protocol = ctx_protocol(ctx); fake_ctx.user_ip4 = addr6.p4; fake_ctx.user_port = ctx_dst_port(ctx); @@ -1167,6 +1214,7 @@ static __always_inline int __sock6_xlate_rev(struct bpf_sock_addr *ctx) struct ipv6_revnat_tuple key = {}; struct ipv6_revnat_entry *val; __u16 dst_port = ctx_dst_port(ctx); + __u8 protocol = ctx_protocol(ctx); key.cookie = sock_local_cookie(ctx); key.port = dst_port; @@ -1181,12 +1229,20 @@ static __always_inline int __sock6_xlate_rev(struct bpf_sock_addr *ctx) struct lb6_key svc_key = { .address = val->address, .dport = val->port, +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + .proto = protocol, +#endif }; svc = lb6_lookup_service(&svc_key, true, false); - if (!svc) + if (!svc) { + /* Restore the original key's protocol as lb6_lookup_service + * has overwritten it. + */ + lb6_key_set_protocol(&svc_key, protocol); svc = sock6_wildcard_lookup_full(&svc_key, ctx_in_hostns(ctx, NULL)); + } if (!svc || svc->rev_nat_index != val->rev_nat_index || (svc->count == 0 && !lb6_svc_is_l7loadbalancer(svc))) { map_delete_elem(&LB6_REVERSE_NAT_SK_MAP, &key); diff --git a/bpf/lib/common.h b/bpf/lib/common.h index 3f18c9f1ec614..7a22e006bd375 100644 --- a/bpf/lib/common.h +++ b/bpf/lib/common.h @@ -947,7 +947,7 @@ struct lb6_key { union v6addr address; /* Service virtual IPv6 address */ __be16 dport; /* L4 port filter, if unset, all ports apply */ __u16 backend_slot; /* Backend iterator, 0 indicates the svc frontend */ - __u8 proto; /* L4 protocol, currently not used (set to 0) */ + __u8 proto; /* L4 protocol, 0 indicates any protocol */ __u8 scope; /* LB_LOOKUP_SCOPE_* for externalTrafficPolicy=Local */ __u8 pad[2]; }; @@ -1005,7 +1005,7 @@ struct lb4_key { __be32 address; /* Service virtual IPv4 address */ __be16 dport; /* L4 port filter, if unset, all ports apply */ __u16 backend_slot; /* Backend iterator, 0 indicates the svc frontend */ - __u8 proto; /* L4 protocol, currently not used (set to 0) */ + __u8 proto; /* L4 protocol, 0 indicates any protocol */ __u8 scope; /* LB_LOOKUP_SCOPE_* for externalTrafficPolicy=Local */ __u8 pad[2]; }; diff --git a/bpf/lib/lb.h b/bpf/lib/lb.h index 85ea0ada76e29..bd276a001d96f 100644 --- a/bpf/lib/lb.h +++ b/bpf/lib/lb.h @@ -487,11 +487,19 @@ static __always_inline int lb6_rev_nat(struct __ctx_buff *ctx, int l4_off, return __lb6_rev_nat(ctx, l4_off, tuple, flags, nat); } +static __always_inline void +lb6_key_set_protocol(struct lb6_key *key __maybe_unused, + __u8 protocol __maybe_unused) +{ +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + key->proto = protocol; +#endif +} + static __always_inline void lb6_fill_key(struct lb6_key *key, struct ipv6_ct_tuple *tuple) { - /* FIXME: set after adding support for different L4 protocols in LB */ - key->proto = 0; + lb6_key_set_protocol(key, tuple->nexthdr); ipv6_addr_copy(&key->address, &tuple->daddr); key->dport = tuple->sport; } @@ -586,6 +594,15 @@ struct lb6_service *lb6_lookup_service(struct lb6_key *key, key->scope = LB_LOOKUP_SCOPE_EXT; key->backend_slot = 0; svc = map_lookup_elem(&LB6_SERVICES_MAP_V2, key); + +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + /* If there are no elements for a specific protocol, check for ANY entries. */ + if (!svc && key->proto != 0) { + key->proto = 0; + svc = map_lookup_elem(&LB6_SERVICES_MAP_V2, key); + } +#endif + if (svc) { if (!scope_switch || !lb6_svc_is_two_scopes(svc)) /* Packets for L7 LB are redirected even when there are no backends. */ @@ -1022,6 +1039,12 @@ struct lb6_service *lb6_lookup_service(struct lb6_key *key __maybe_unused, return NULL; } +static __always_inline void +lb6_key_set_protocol(struct lb6_key *key __maybe_unused, + __u8 protocol __maybe_unused) +{ +} + static __always_inline struct lb6_service *__lb6_lookup_backend_slot(struct lb6_key *key __maybe_unused) { @@ -1145,11 +1168,19 @@ static __always_inline int lb4_rev_nat(struct __ctx_buff *ctx, int l3_off, int l loopback, has_l4_header); } +static __always_inline void +lb4_key_set_protocol(struct lb4_key *key __maybe_unused, + __u8 protocol __maybe_unused) +{ +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + key->proto = protocol; +#endif +} + static __always_inline void lb4_fill_key(struct lb4_key *key, const struct ipv4_ct_tuple *tuple) { - /* FIXME: set after adding support for different L4 protocols in LB */ - key->proto = 0; + lb4_key_set_protocol(key, tuple->nexthdr); key->address = tuple->daddr; /* CT tuple has ports in reverse order: */ key->dport = tuple->sport; @@ -1248,6 +1279,15 @@ struct lb4_service *lb4_lookup_service(struct lb4_key *key, key->scope = LB_LOOKUP_SCOPE_EXT; key->backend_slot = 0; svc = map_lookup_elem(&LB4_SERVICES_MAP_V2, key); + +#if defined(ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION) + /* If there are no elements for a specific protocol, check for ANY entries. */ + if (!svc && key->proto != 0) { + key->proto = 0; + svc = map_lookup_elem(&LB4_SERVICES_MAP_V2, key); + } +#endif + if (svc) { if (!scope_switch || !lb4_svc_is_two_scopes(svc)) /* Packets for L7 LB are redirected even when there are no backends. */ diff --git a/cilium/cmd/bpf_lb_list.go b/cilium/cmd/bpf_lb_list.go index e5f2522635904..b3a4b1f43284e 100644 --- a/cilium/cmd/bpf_lb_list.go +++ b/cilium/cmd/bpf_lb_list.go @@ -14,6 +14,7 @@ import ( "github.com/cilium/cilium/pkg/common" "github.com/cilium/cilium/pkg/loadbalancer" "github.com/cilium/cilium/pkg/maps/lbmap" + "github.com/cilium/cilium/pkg/u8proto" ) const ( @@ -105,12 +106,12 @@ func dumpSVC(serviceList map[string][]string) { } else if backend, found := backendMap[backendID]; !found { entry = fmt.Sprintf("backend %d not found", backendID) } else { - fmtStr := "%s:%d (%d) (%d)" + fmtStr := "%s:%d/%s (%d) (%d)" if svcKey.IsIPv6() { - fmtStr = "[%s]:%d (%d) (%d)" + fmtStr = "[%s]:%d/%s (%d) (%d)" } entry = fmt.Sprintf(fmtStr, backend.GetAddress(), - backend.GetPort(), revNATID, backendSlot) + backend.GetPort(), u8proto.U8proto(backend.GetProtocol()).String(), revNATID, backendSlot) } serviceList[svc] = append(serviceList[svc], entry) diff --git a/cilium/cmd/service_update.go b/cilium/cmd/service_update.go index bc0a2d76b5ca1..758be4c6a5582 100644 --- a/cilium/cmd/service_update.go +++ b/cilium/cmd/service_update.go @@ -29,6 +29,7 @@ var ( localRedirect bool idU uint64 frontend string + protocol string backends []string backendStates []string backendWeights []uint @@ -59,13 +60,38 @@ func init() { serviceUpdateCmd.Flags().StringVarP(&k8sIntTrafficPolicy, "k8s-int-traffic-policy", "", "Cluster", "Set service with k8s internalTrafficPolicy as {Local,Cluster}") serviceUpdateCmd.Flags().BoolVarP(&k8sClusterInternal, "k8s-cluster-internal", "", false, "Set service as cluster-internal for externalTrafficPolicy=Local xor internalTrafficPolicy=Local") serviceUpdateCmd.Flags().StringVarP(&frontend, "frontend", "", "", "Frontend address") + serviceUpdateCmd.Flags().StringVarP(&protocol, "protocol", "", "tcp", "Protocol for service (e.g. TCP, UDP)") serviceUpdateCmd.Flags().StringSliceVarP(&backends, "backends", "", []string{}, "Backend address or addresses ()") serviceUpdateCmd.Flags().StringSliceVarP(&backendStates, "states", "", []string{}, "Backend state(s) as {active(default),terminating,quarantined,maintenance}") serviceUpdateCmd.Flags().UintSliceVarP(&backendWeights, "backend-weights", "", []uint{}, "Backend weights (100 default, 0 means maintenance state, only for maglev mode)") } -func parseFrontendAddress(address string) *models.FrontendAddress { - frontend, err := net.ResolveTCPAddr("tcp", address) +func parseAddress(l4Protocol, address string) (ip net.IP, port int, proto string, err error) { + switch proto = strings.ToLower(l4Protocol); proto { + case "tcp": + var tcpAddr *net.TCPAddr + tcpAddr, err = net.ResolveTCPAddr(proto, address) + if err != nil { + return + } + ip = tcpAddr.IP + port = tcpAddr.Port + case "udp": + var udpAddr *net.UDPAddr + udpAddr, err = net.ResolveUDPAddr(proto, address) + if err != nil { + return + } + ip = udpAddr.IP + port = udpAddr.Port + default: + err = fmt.Errorf("unrecognized protocol %q", l4Protocol) + } + return +} + +func parseFrontendAddress(l4Protocol, address string) *models.FrontendAddress { + ip, port, proto, err := parseAddress(l4Protocol, address) if err != nil { Fatalf("Unable to parse frontend address: %s\n", err) } @@ -75,11 +101,10 @@ func parseFrontendAddress(address string) *models.FrontendAddress { scope = models.FrontendAddressScopeInternal } - // FIXME support more than TCP return &models.FrontendAddress{ - IP: frontend.IP.String(), - Port: uint16(frontend.Port), - Protocol: models.FrontendAddressProtocolTCP, + IP: ip.String(), + Port: uint16(port), + Protocol: proto, Scope: scope, } } @@ -95,7 +120,7 @@ func updateService(cmd *cobra.Command, args []string) { warnIdTypeDeprecation() id := int64(idU) - fa := parseFrontendAddress(frontend) + fa := parseFrontendAddress(protocol, frontend) skipFrontendCheck := false var spec *models.ServiceSpec @@ -203,17 +228,17 @@ func updateService(cmd *cobra.Command, args []string) { default: Fatalf("Invalid number of backend states (%v) for backends (%v)", backendStates, backends) } + for i, backend := range backends { - beAddr, err := net.ResolveTCPAddr("tcp", backend) + ip, port, proto, err := parseAddress(protocol, backend) if err != nil { - Fatalf("Cannot parse backend address \"%s\": %s", backend, err) + Fatalf("Cannot parse backend address %q: %s", backend, err) } - // Backend ID will be set by the daemon - be := loadbalancer.NewBackend(0, loadbalancer.TCP, cmtypes.MustAddrClusterFromIP(beAddr.IP), uint16(beAddr.Port)) + be := loadbalancer.NewBackend(0, loadbalancer.L4Type(strings.ToUpper(proto)), cmtypes.MustAddrClusterFromIP(ip), uint16(port)) - if !skipFrontendCheck && fa.Port == 0 && beAddr.Port != 0 { - Fatalf("L4 backend found (%v) with L3 frontend", beAddr) + if !skipFrontendCheck && fa.Port == 0 && port != 0 { + Fatalf("L4 backend found (%s:%d) with L3 frontend", ip, port) } ba := be.GetBackendModel() diff --git a/daemon/cmd/daemon_main.go b/daemon/cmd/daemon_main.go index 02bd28dd231fb..a638162078f05 100644 --- a/daemon/cmd/daemon_main.go +++ b/daemon/cmd/daemon_main.go @@ -619,6 +619,9 @@ func initializeFlags() { option.NodePortAccelerationNative, option.NodePortAccelerationDisabled)) option.BindEnv(Vp, option.LoadBalancerAcceleration) + flags.Bool(option.LoadBalancerProtocolDifferentiation, true, "Enable support for service protocol differentiation (TCP, UDP, SCTP)") + option.BindEnv(Vp, option.LoadBalancerProtocolDifferentiation) + flags.Uint(option.MaglevTableSize, maglev.DefaultTableSize, "Maglev per service backend table size (parameter M)") option.BindEnv(Vp, option.MaglevTableSize) diff --git a/install/kubernetes/cilium/templates/cilium-configmap.yaml b/install/kubernetes/cilium/templates/cilium-configmap.yaml index f6de6f42ef5b6..82a0082c56f42 100644 --- a/install/kubernetes/cilium/templates/cilium-configmap.yaml +++ b/install/kubernetes/cilium/templates/cilium-configmap.yaml @@ -724,6 +724,9 @@ data: {{- if hasKey .Values.loadBalancer "serviceTopology" }} enable-service-topology: {{ .Values.loadBalancer.serviceTopology | quote }} {{- end }} +{{- if hasKey .Values.loadBalancer "protocolDifferentiation" }} + bpf-lb-proto-diff: {{ .Values.loadBalancer.protocolDifferentiation.enabled | quote }} +{{- end }} {{- end }} {{- if hasKey .Values.maglev "tableSize" }} diff --git a/pkg/datapath/linux/config/config.go b/pkg/datapath/linux/config/config.go index 333076b2ff3cb..e432942a0e098 100644 --- a/pkg/datapath/linux/config/config.go +++ b/pkg/datapath/linux/config/config.go @@ -464,6 +464,7 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC cDefinesMap["IPV6_RSS_PREFIX_BITS"] = "128" } } + if option.Config.NodePortAcceleration != option.NodePortAccelerationDisabled { cDefinesMap["ENABLE_NODEPORT_ACCELERATION"] = "1" } @@ -699,6 +700,10 @@ func (h *HeaderfileWriter) WriteNodeConfig(w io.Writer, cfg *datapath.LocalNodeC } cDefinesMap["EPHEMERAL_MIN"] = fmt.Sprintf("%d", ephemeralMin) + if option.Config.LoadBalancerProtocolDifferentiation { + cDefinesMap["ENABLE_SERVICE_PROTOCOL_DIFFERENTIATION"] = "1" + } + // Since golang maps are unordered, we sort the keys in the map // to get a consistent written format to the writer. This maintains // the consistency when we try to calculate hash for a datapath after diff --git a/pkg/datapath/types/lbmap.go b/pkg/datapath/types/lbmap.go index 44ba2bbe6b417..47cd80c214957 100644 --- a/pkg/datapath/types/lbmap.go +++ b/pkg/datapath/types/lbmap.go @@ -30,9 +30,10 @@ type LBMap interface { } type UpsertServiceParams struct { - ID uint16 - IP net.IP - Port uint16 + ID uint16 + IP net.IP + Port uint16 + Protocol uint8 // PreferredBackends is a subset of ActiveBackends // Note: this is only used in clustermesh with service affinity annotation. diff --git a/pkg/k8s/service.go b/pkg/k8s/service.go index 4ec47f42b9d7d..8bd202671dbd5 100644 --- a/pkg/k8s/service.go +++ b/pkg/k8s/service.go @@ -533,13 +533,10 @@ func NewService(ips []net.IP, externalIPs, loadBalancerIPs, loadBalancerSourceRa } // UniquePorts returns a map of all unique ports configured in the service -func (s *Service) UniquePorts() map[uint16]bool { - // We are not discriminating the different L4 protocols on the same L4 - // port so we create the number of unique sets of service IP + service - // port. - uniqPorts := map[uint16]bool{} +func (s *Service) UniquePorts() map[string]bool { + uniqPorts := map[string]bool{} for _, p := range s.Ports { - uniqPorts[p.Port] = true + uniqPorts[p.String()] = true } return uniqPorts } diff --git a/pkg/k8s/watchers/watcher.go b/pkg/k8s/watchers/watcher.go index 0a0b3935e843c..ad54f984fb476 100644 --- a/pkg/k8s/watchers/watcher.go +++ b/pkg/k8s/watchers/watcher.go @@ -723,53 +723,60 @@ func (k *K8sWatcher) delK8sSVCs(svc k8s.ServiceID, svcInfo *k8s.Service, se *k8s logfields.K8sNamespace: svc.Namespace, }) - repPorts := svcInfo.UniquePorts() + for _, checkProtocol := range []bool{true, false} { + if !checkProtocol { + svcInfo = stripServiceProtocol(svcInfo) + } - frontends := []*loadbalancer.L3n4Addr{} + repPorts := svcInfo.UniquePorts() - for portName, svcPort := range svcInfo.Ports { - if !repPorts[svcPort.Port] { - continue - } - repPorts[svcPort.Port] = false + frontends := []*loadbalancer.L3n4Addr{} - for _, feIP := range svcInfo.FrontendIPs { - fe := loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(feIP), svcPort.Port, loadbalancer.ScopeExternal) - frontends = append(frontends, fe) - } + for portName, svcPort := range svcInfo.Ports { + if !repPorts[svcPort.String()] { + continue + } + repPorts[svcPort.String()] = false - for _, nodePortFE := range svcInfo.NodePorts[portName] { - frontends = append(frontends, &nodePortFE.L3n4Addr) - if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { - cpFE := nodePortFE.L3n4Addr.DeepCopy() - cpFE.Scope = loadbalancer.ScopeInternal - frontends = append(frontends, cpFE) + for _, feIP := range svcInfo.FrontendIPs { + fe := loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(feIP), svcPort.Port, loadbalancer.ScopeExternal) + frontends = append(frontends, fe) } - } - for _, k8sExternalIP := range svcInfo.K8sExternalIPs { - frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(k8sExternalIP), svcPort.Port, loadbalancer.ScopeExternal)) - } + for _, nodePortFE := range svcInfo.NodePorts[portName] { + frontends = append(frontends, &nodePortFE.L3n4Addr) + if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { + cpFE := nodePortFE.L3n4Addr.DeepCopy() + cpFE.Scope = loadbalancer.ScopeInternal + frontends = append(frontends, cpFE) + } + } - for _, ip := range svcInfo.LoadBalancerIPs { - addrCluster := cmtypes.MustAddrClusterFromIP(ip) - frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeExternal)) - if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { - frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeInternal)) + for _, k8sExternalIP := range svcInfo.K8sExternalIPs { + frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, cmtypes.MustAddrClusterFromIP(k8sExternalIP), svcPort.Port, loadbalancer.ScopeExternal)) + } + + for _, ip := range svcInfo.LoadBalancerIPs { + addrCluster := cmtypes.MustAddrClusterFromIP(ip) + frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeExternal)) + if svcInfo.ExtTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal || svcInfo.IntTrafficPolicy == loadbalancer.SVCTrafficPolicyLocal { + frontends = append(frontends, loadbalancer.NewL3n4Addr(svcPort.Protocol, addrCluster, svcPort.Port, loadbalancer.ScopeInternal)) + } } } - } - for _, fe := range frontends { - if found, err := k.svcManager.DeleteService(*fe); err != nil { - scopedLog.WithError(err).WithField(logfields.Object, logfields.Repr(fe)). - Warn("Error deleting service by frontend") - } else if !found { - scopedLog.WithField(logfields.Object, logfields.Repr(fe)).Warn("service not found") - } else { - scopedLog.Debugf("# cilium lb delete-service %s %d 0", fe.AddrCluster.String(), fe.Port) + for _, fe := range frontends { + if found, err := k.svcManager.DeleteService(*fe); err != nil { + scopedLog.WithError(err).WithField(logfields.Object, logfields.Repr(fe)). + Warn("Error deleting service by frontend") + } else if !found { + scopedLog.WithField(logfields.Object, logfields.Repr(fe)).Warn("service not found") + } else { + scopedLog.Debugf("# cilium lb delete-service %s %d 0", fe.AddrCluster.String(), fe.Port) + } } } + return nil } @@ -866,10 +873,10 @@ func datapathSVCs(svc *k8s.Service, endpoints *k8s.Endpoints) (svcs []loadbalanc clusterIPPorts := map[loadbalancer.FEPortName]*loadbalancer.L4Addr{} for fePortName, fePort := range svc.Ports { - if !uniqPorts[fePort.Port] { + if !uniqPorts[fePort.String()] { continue } - uniqPorts[fePort.Port] = false + uniqPorts[fePort.String()] = false clusterIPPorts[fePortName] = fePort } @@ -930,6 +937,38 @@ func hashSVCMap(svcs []loadbalancer.SVC) map[string]loadbalancer.L3n4Addr { return m } +func stripServiceProtocol(svc *k8s.Service) *k8s.Service { + if svc == nil { + return nil + } + + svc = svc.DeepCopy() + + for _, port := range svc.Ports { + port.Protocol = "ANY" + } + + for _, nodePort := range svc.NodePorts { + for _, port := range nodePort { + port.Protocol = "ANY" + } + } + + return svc +} + +func stripEndpointsProtocol(endpoints *k8s.Endpoints) *k8s.Endpoints { + endpoints = endpoints.DeepCopy() + + for _, backend := range endpoints.Backends { + for _, port := range backend.Ports { + port.Protocol = "ANY" + } + } + + return endpoints +} + func (k *K8sWatcher) addK8sSVCs(svcID k8s.ServiceID, oldSvc, svc *k8s.Service, endpoints *k8s.Endpoints) error { // Headless services do not need any datapath implementation if svc.IsHeadless { @@ -941,6 +980,12 @@ func (k *K8sWatcher) addK8sSVCs(svcID k8s.ServiceID, oldSvc, svc *k8s.Service, e logfields.K8sNamespace: svcID.Namespace, }) + if !option.Config.LoadBalancerProtocolDifferentiation { + oldSvc = stripServiceProtocol(oldSvc) + svc = stripServiceProtocol(svc) + endpoints = stripEndpointsProtocol(endpoints) + } + svcs := datapathSVCs(svc, endpoints) svcMap := hashSVCMap(svcs) diff --git a/pkg/loadbalancer/loadbalancer.go b/pkg/loadbalancer/loadbalancer.go index 92c4bd1260e3c..ead2ef528c1a0 100644 --- a/pkg/loadbalancer/loadbalancer.go +++ b/pkg/loadbalancer/loadbalancer.go @@ -13,6 +13,7 @@ import ( "github.com/cilium/cilium/pkg/cidr" cmtypes "github.com/cilium/cilium/pkg/clustermesh/types" "github.com/cilium/cilium/pkg/option" + "github.com/cilium/cilium/pkg/u8proto" ) // SVCType is a type of a service. @@ -229,7 +230,10 @@ func (s ServiceFlags) UInt16() uint16 { } const ( + // NONE type. NONE = L4Type("NONE") + // ANY type. + ANY = L4Type("ANY") // TCP type. TCP = L4Type("TCP") // UDP type. @@ -524,6 +528,10 @@ func IsValidBackendState(state string) bool { func NewL4Type(name string) (L4Type, error) { switch strings.ToLower(name) { + case "none": + return NONE, nil + case "any": + return ANY, nil case "tcp": return TCP, nil case "udp": @@ -535,6 +543,19 @@ func NewL4Type(name string) (L4Type, error) { } } +func NewL4TypeFromNumber(proto uint8) L4Type { + switch proto { + case 6: + return TCP + case 17: + return UDP + case 132: + return SCTP + default: + return ANY + } +} + // L4Addr is an abstraction for the backend port with a L4Type, usually tcp or udp, and // the Port number. // @@ -558,6 +579,22 @@ func NewL4Addr(protocol L4Type, number uint16) *L4Addr { return &L4Addr{Protocol: protocol, Port: number} } +// Equals returns true if both L4Addr are considered equal. +func (l *L4Addr) Equals(o *L4Addr) bool { + switch { + case (l == nil) != (o == nil): + return false + case (l == nil) && (o == nil): + return true + } + return l.Port == o.Port && l.Protocol == o.Protocol +} + +// String returns a string representation of an L4Addr +func (l *L4Addr) String() string { + return fmt.Sprintf("%d/%s", l.Port, l.Protocol) +} + // L3n4Addr is used to store, as an unique L3+L4 address in the KVStore. It also // includes the lookup scope for frontend addresses which is used in service // handling for externalTrafficPolicy=Local and internalTrafficPolicy=Local, @@ -659,8 +696,7 @@ func NewBackendFromBackendModel(base *models.BackendAddress) (*Backend, error) { return nil, fmt.Errorf("missing IP address") } - // FIXME: Should this be NONE ? - l4addr := NewL4Addr(NONE, base.Port) + l4addr := NewL4Addr(base.Protocol, base.Port) addrCluster, err := cmtypes.ParseAddrCluster(*base.IP) if err != nil { return nil, err @@ -693,8 +729,7 @@ func NewL3n4AddrFromBackendModel(base *models.BackendAddress) (*L3n4Addr, error) return nil, fmt.Errorf("missing IP address") } - // FIXME: Should this be NONE ? - l4addr := NewL4Addr(NONE, base.Port) + l4addr := NewL4Addr(base.Protocol, base.Port) addrCluster, err := cmtypes.ParseAddrCluster(*base.IP) if err != nil { return nil, err @@ -712,9 +747,10 @@ func (a *L3n4Addr) GetModel() *models.FrontendAddress { scope = models.FrontendAddressScopeInternal } return &models.FrontendAddress{ - IP: a.AddrCluster.String(), - Port: a.Port, - Scope: scope, + IP: a.AddrCluster.String(), + Protocol: a.Protocol, + Port: a.Port, + Scope: scope, } } @@ -727,6 +763,7 @@ func (b *Backend) GetBackendModel() *models.BackendAddress { stateStr, _ := b.State.String() return &models.BackendAddress{ IP: &addrClusterStr, + Protocol: b.Protocol, Port: b.Port, NodeName: b.NodeName, State: stateStr, @@ -735,17 +772,10 @@ func (b *Backend) GetBackendModel() *models.BackendAddress { } } -// String returns the L3n4Addr in the "IPv4:Port[/Scope]" format for IPv4 and -// "[IPv6]:Port[/Scope]" format for IPv6. +// String returns the L3n4Addr in the "IPv4:Port/Protocol[/Scope]" format for IPv4 and +// "[IPv6]:Port/Protocol[/Scope]" format for IPv6. func (a *L3n4Addr) String() string { - var scope string - if a.Scope == ScopeInternal { - scope = "/i" - } - if a.IsIPv6() { - return fmt.Sprintf("[%s]:%d%s", a.AddrCluster.String(), a.Port, scope) - } - return fmt.Sprintf("%s:%d%s", a.AddrCluster.String(), a.Port, scope) + return a.StringWithProtocol() } // StringWithProtocol returns the L3n4Addr in the "IPv4:Port/Protocol[/Scope]" @@ -763,8 +793,6 @@ func (a *L3n4Addr) StringWithProtocol() string { // StringID returns the L3n4Addr as string to be used for unique identification func (a *L3n4Addr) StringID() string { - // This does not include the protocol right now as the datapath does - // not include the protocol in the lookup of the service IP. return a.String() } @@ -772,14 +800,15 @@ func (a *L3n4Addr) StringID() string { // Note: the resulting string is meant to be used as a key for maps and is not // readable by a human eye when printed out. func (a L3n4Addr) Hash() string { - const lenProto = 0 // proto is omitted for now + const lenProto = 1 // proto is uint8 const lenScope = 1 // scope is uint8 which is an alias for byte const lenPort = 2 // port is uint16 which is 2 bytes b := make([]byte, cmtypes.AddrClusterLen+lenProto+lenScope+lenPort) ac20 := a.AddrCluster.As20() copy(b, ac20[:]) - // FIXME: add Protocol once we care about protocols + u8p, _ := u8proto.ParseProtocol(a.Protocol) + b[net.IPv6len] = byte(u8p) // scope is a uint8 which is an alias for byte so a cast is safe b[net.IPv6len+lenProto] = byte(a.Scope) // port is a uint16, so 2 bytes diff --git a/pkg/maps/lbmap/ipv4.go b/pkg/maps/lbmap/ipv4.go index 1821c2d0e5816..750a5b4c64231 100644 --- a/pkg/maps/lbmap/ipv4.go +++ b/pkg/maps/lbmap/ipv4.go @@ -250,6 +250,7 @@ func NewService4Key(ip net.IP, port uint16, proto u8proto.U8proto, scope uint8, func (k *Service4Key) String() string { kHost := k.ToHost().(*Service4Key) addr := net.JoinHostPort(kHost.Address.String(), fmt.Sprintf("%d", kHost.Port)) + addr += fmt.Sprintf("/%s", u8proto.U8proto(kHost.Proto).String()) if kHost.Scope == loadbalancer.ScopeInternal { addr += "/i" } @@ -268,6 +269,7 @@ func (k *Service4Key) SetScope(scope uint8) { k.Scope = scope } func (k *Service4Key) GetScope() uint8 { return k.Scope } func (k *Service4Key) GetAddress() net.IP { return k.Address.IP() } func (k *Service4Key) GetPort() uint16 { return k.Port } +func (k *Service4Key) GetProtocol() uint8 { return k.Proto } func (k *Service4Key) MapDelete() error { return k.Map().Delete(k.ToNetwork()) } func (k *Service4Key) RevNatValue() RevNatValue { @@ -413,8 +415,9 @@ func (b *Backend4Value) GetAddress() net.IP { return b.Address.IP() } func (b *Backend4Value) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), 0) } -func (b *Backend4Value) GetPort() uint16 { return b.Port } -func (b *Backend4Value) GetFlags() uint8 { return b.Flags } +func (b *Backend4Value) GetPort() uint16 { return b.Port } +func (b *Backend4Value) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend4Value) GetFlags() uint8 { return b.Flags } func (v *Backend4Value) ToNetwork() BackendValue { n := *v @@ -475,8 +478,9 @@ func (b *Backend4ValueV3) GetAddress() net.IP { return b.Address.IP() } func (b *Backend4ValueV3) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), uint32(b.ClusterID)) } -func (b *Backend4ValueV3) GetPort() uint16 { return b.Port } -func (b *Backend4ValueV3) GetFlags() uint8 { return b.Flags } +func (b *Backend4ValueV3) GetPort() uint16 { return b.Port } +func (b *Backend4ValueV3) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend4ValueV3) GetFlags() uint8 { return b.Flags } func (v *Backend4ValueV3) ToNetwork() BackendValue { n := *v diff --git a/pkg/maps/lbmap/ipv6.go b/pkg/maps/lbmap/ipv6.go index 340cf9acc36d0..55beb19a78344 100644 --- a/pkg/maps/lbmap/ipv6.go +++ b/pkg/maps/lbmap/ipv6.go @@ -147,9 +147,9 @@ func NewService6Key(ip net.IP, port uint16, proto u8proto.U8proto, scope uint8, func (k *Service6Key) String() string { kHost := k.ToHost().(*Service6Key) if kHost.Scope == loadbalancer.ScopeInternal { - return fmt.Sprintf("[%s]:%d/i (%d)", kHost.Address, kHost.Port, kHost.BackendSlot) + return fmt.Sprintf("[%s]:%d/%s/i (%d)", kHost.Address, kHost.Port, u8proto.U8proto(kHost.Proto).String(), kHost.BackendSlot) } else { - return fmt.Sprintf("[%s]:%d (%d)", kHost.Address, kHost.Port, kHost.BackendSlot) + return fmt.Sprintf("[%s]:%d/%s (%d)", kHost.Address, kHost.Port, u8proto.U8proto(kHost.Proto).String(), kHost.BackendSlot) } } @@ -164,6 +164,7 @@ func (k *Service6Key) SetScope(scope uint8) { k.Scope = scope } func (k *Service6Key) GetScope() uint8 { return k.Scope } func (k *Service6Key) GetAddress() net.IP { return k.Address.IP() } func (k *Service6Key) GetPort() uint16 { return k.Port } +func (k *Service6Key) GetProtocol() uint8 { return k.Proto } func (k *Service6Key) MapDelete() error { return k.Map().Delete(k.ToNetwork()) } func (k *Service6Key) RevNatValue() RevNatValue { @@ -307,8 +308,9 @@ func (b *Backend6Value) GetAddress() net.IP { return b.Address.IP() } func (b *Backend6Value) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), 0) } -func (b *Backend6Value) GetPort() uint16 { return b.Port } -func (b *Backend6Value) GetFlags() uint8 { return b.Flags } +func (b *Backend6Value) GetPort() uint16 { return b.Port } +func (b *Backend6Value) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend6Value) GetFlags() uint8 { return b.Flags } func (v *Backend6Value) ToNetwork() BackendValue { n := *v @@ -369,8 +371,9 @@ func (b *Backend6ValueV3) GetAddress() net.IP { return b.Address.IP() } func (b *Backend6ValueV3) GetIPCluster() cmtypes.AddrCluster { return cmtypes.AddrClusterFrom(b.Address.Addr(), uint32(b.ClusterID)) } -func (b *Backend6ValueV3) GetPort() uint16 { return b.Port } -func (b *Backend6ValueV3) GetFlags() uint8 { return b.Flags } +func (b *Backend6ValueV3) GetPort() uint16 { return b.Port } +func (b *Backend6ValueV3) GetProtocol() uint8 { return uint8(b.Proto) } +func (b *Backend6ValueV3) GetFlags() uint8 { return b.Flags } func (v *Backend6ValueV3) ToNetwork() BackendValue { n := *v diff --git a/pkg/maps/lbmap/lbmap.go b/pkg/maps/lbmap/lbmap.go index ba73d33c9aa34..8865c23df7ea5 100644 --- a/pkg/maps/lbmap/lbmap.go +++ b/pkg/maps/lbmap/lbmap.go @@ -6,8 +6,6 @@ package lbmap import ( "errors" "fmt" - "net" - "strconv" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" @@ -76,10 +74,10 @@ func (lbmap *LBBPFMap) upsertServiceProto(p *datapathTypes.UpsertServiceParams, backendsOk := ipv6 || !ipv6 && p.NatPolicy != loadbalancer.SVCNatPolicyNat46 if ipv6 { - svcKey = NewService6Key(p.IP, p.Port, u8proto.ANY, p.Scope, 0) + svcKey = NewService6Key(p.IP, p.Port, u8proto.U8proto(p.Protocol), p.Scope, 0) svcVal = &Service6Value{} } else { - svcKey = NewService4Key(p.IP, p.Port, u8proto.ANY, p.Scope, 0) + svcKey = NewService4Key(p.IP, p.Port, u8proto.U8proto(p.Protocol), p.Scope, 0) svcVal = &Service4Value{} } @@ -200,11 +198,16 @@ func deleteServiceProto(svc loadbalancer.L3n4AddrID, backendCount int, useMaglev revNATKey RevNatKey ) + u8p, err := u8proto.ParseProtocol(svc.Protocol) + if err != nil { + return err + } + if ipv6 { - svcKey = NewService6Key(svc.AddrCluster.AsNetIP(), svc.Port, u8proto.ANY, svc.Scope, 0) + svcKey = NewService6Key(svc.AddrCluster.AsNetIP(), svc.Port, u8p, svc.Scope, 0) revNATKey = NewRevNat6Key(uint16(svc.ID)) } else { - svcKey = NewService4Key(svc.AddrCluster.AsNetIP(), svc.Port, u8proto.ANY, svc.Scope, 0) + svcKey = NewService4Key(svc.AddrCluster.AsNetIP(), svc.Port, u8p, svc.Scope, 0) revNATKey = NewRevNat4Key(uint16(svc.ID)) } @@ -454,11 +457,8 @@ func (*LBBPFMap) DumpServiceMaps() ([]*loadbalancer.SVC, []error) { if svcKey.GetBackendSlot() == 0 { // Build a cache of flags stored in the value of the master key to // map it later. - // FIXME proto is being ignored everywhere in the datapath. - addrStr := svcKey.GetAddress().String() - portStr := strconv.Itoa(int(svcKey.GetPort())) - flagsCache[net.JoinHostPort(addrStr, portStr)] = loadbalancer.ServiceFlags(svcValue.GetFlags()) + flagsCache[fe.String()] = loadbalancer.ServiceFlags(svcValue.GetFlags()) newSVCMap.addFE(fe) return } @@ -502,13 +502,11 @@ func (*LBBPFMap) DumpServiceMaps() ([]*loadbalancer.SVC, []error) { newSVCList := make([]*loadbalancer.SVC, 0, len(newSVCMap)) for hash := range newSVCMap { svc := newSVCMap[hash] - addrStr := svc.Frontend.AddrCluster.String() - portStr := strconv.Itoa(int(svc.Frontend.Port)) - host := net.JoinHostPort(addrStr, portStr) - svc.Type = flagsCache[host].SVCType() - svc.ExtTrafficPolicy = flagsCache[host].SVCExtTrafficPolicy() - svc.IntTrafficPolicy = flagsCache[host].SVCIntTrafficPolicy() - svc.NatPolicy = flagsCache[host].SVCNatPolicy(svc.Frontend.L3n4Addr) + key := svc.Frontend.String() + svc.Type = flagsCache[key].SVCType() + svc.ExtTrafficPolicy = flagsCache[key].SVCExtTrafficPolicy() + svc.IntTrafficPolicy = flagsCache[key].SVCIntTrafficPolicy() + svc.NatPolicy = flagsCache[key].SVCNatPolicy(svc.Frontend.L3n4Addr) newSVCList = append(newSVCList, &svc) } @@ -546,7 +544,7 @@ func (*LBBPFMap) DumpBackendMaps() ([]*loadbalancer.Backend, error) { ip := backendVal.GetAddress() addrCluster := cmtypes.MustAddrClusterFromIP(ip) port := backendVal.GetPort() - proto := loadbalancer.NONE + proto := loadbalancer.NewL4TypeFromNumber(backendVal.GetProtocol()) state := loadbalancer.GetBackendStateFromFlags(backendVal.GetFlags()) lbBackend := loadbalancer.NewBackendWithState(backendID, proto, addrCluster, port, state) lbBackends = append(lbBackends, lbBackend) @@ -611,11 +609,17 @@ func getBackend(backend *loadbalancer.Backend, ipv6 bool) (Backend, error) { return lbBackend, fmt.Errorf("invalid backend ID 0") } + u8p, err := u8proto.ParseProtocol(backend.Protocol) + if err != nil { + return nil, fmt.Errorf("unable to parse protocol lbBackend (%d, %s, %d, %s, %t): %w", + backend.ID, backend.AddrCluster.String(), backend.Port, backend.Protocol, ipv6, err) + } + if ipv6 { - lbBackend, err = NewBackend6V3(backend.ID, backend.AddrCluster, backend.Port, u8proto.ANY, + lbBackend, err = NewBackend6V3(backend.ID, backend.AddrCluster, backend.Port, u8p, backend.State) } else { - lbBackend, err = NewBackend4V3(backend.ID, backend.AddrCluster, backend.Port, u8proto.ANY, + lbBackend, err = NewBackend4V3(backend.ID, backend.AddrCluster, backend.Port, u8p, backend.State) } if err != nil { diff --git a/pkg/maps/lbmap/types.go b/pkg/maps/lbmap/types.go index 5adfe088b2e5b..3b8adab2a95ad 100644 --- a/pkg/maps/lbmap/types.go +++ b/pkg/maps/lbmap/types.go @@ -11,7 +11,7 @@ import ( "github.com/cilium/cilium/pkg/loadbalancer" ) -// ServiceKey is the interface describing protocol independent key for services map v2. +// ServiceKey is the interface describing key for services map v2. type ServiceKey interface { bpf.MapKey @@ -42,6 +42,9 @@ type ServiceKey interface { // Get frontend port GetPort() uint16 + // Get protocol + GetProtocol() uint8 + // Returns a RevNatValue matching a ServiceKey RevNatValue() RevNatValue @@ -113,7 +116,7 @@ type BackendKey interface { GetID() loadbalancer.BackendID } -// BackendValue is the interface describing protocol independent backend value. +// BackendValue is the interface describing backend value. type BackendValue interface { bpf.MapValue @@ -126,6 +129,9 @@ type BackendValue interface { // Get backend port GetPort() uint16 + // Get backend protocol + GetProtocol() uint8 + // Get backend flags GetFlags() uint8 @@ -177,7 +183,8 @@ type RevNatValue interface { func svcFrontend(svcKey ServiceKey, svcValue ServiceValue) *loadbalancer.L3n4AddrID { feIP := svcKey.GetAddress() feAddrCluster := cmtypes.MustAddrClusterFromIP(feIP) - feL3n4Addr := loadbalancer.NewL3n4Addr(loadbalancer.NONE, feAddrCluster, svcKey.GetPort(), svcKey.GetScope()) + p := loadbalancer.NewL4TypeFromNumber(svcKey.GetProtocol()) + feL3n4Addr := loadbalancer.NewL3n4Addr(p, feAddrCluster, svcKey.GetPort(), svcKey.GetScope()) feL3n4AddrID := &loadbalancer.L3n4AddrID{ L3n4Addr: *feL3n4Addr, ID: loadbalancer.ID(svcValue.GetRevNat()), @@ -189,7 +196,7 @@ func svcBackend(backendID loadbalancer.BackendID, backend BackendValue) *loadbal beIP := backend.GetAddress() beAddrCluster := cmtypes.MustAddrClusterFromIP(beIP) bePort := backend.GetPort() - beProto := loadbalancer.NONE + beProto := loadbalancer.NewL4TypeFromNumber(backend.GetProtocol()) beState := loadbalancer.GetBackendStateFromFlags(backend.GetFlags()) beBackend := loadbalancer.NewBackendWithState(backendID, beProto, beAddrCluster, bePort, beState) return beBackend diff --git a/pkg/option/config.go b/pkg/option/config.go index e891a60a3da09..b506c03c0d6aa 100644 --- a/pkg/option/config.go +++ b/pkg/option/config.go @@ -285,6 +285,13 @@ const ( // Alias to NodePortAcceleration LoadBalancerAcceleration = "bpf-lb-acceleration" + // LoadBalancerExternalControlPlane switch skips connectivity to kube-apiserver + // which is relevant in lb-only mode + LoadBalancerExternalControlPlane = "bpf-lb-external-control-plane" + + // LoadBalancerProtocolDifferentiation enables support for service protocol differentiation (TCP, UDP, SCTP) + LoadBalancerProtocolDifferentiation = "bpf-lb-proto-diff" + // MaglevTableSize determines the size of the backend table per service MaglevTableSize = "bpf-lb-maglev-table-size" @@ -2058,6 +2065,13 @@ type DaemonConfig struct { LoadBalancerRSSv6CIDR string LoadBalancerRSSv6 net.IPNet + // LoadBalancerExternalControlPlane tells whether to not use kube-apiserver as + // its control plane in lb-only mode. + LoadBalancerExternalControlPlane bool + + // LoadBalancerProtocolDifferentiation enables support for service protocol differentiation (TCP, UDP, SCTP) + LoadBalancerProtocolDifferentiation bool + // EnablePMTUDiscovery indicates whether to send ICMP fragmentation-needed // replies to the client (when needed). EnablePMTUDiscovery bool @@ -3590,6 +3604,8 @@ func (c *DaemonConfig) Populate(vp *viper.Viper) { // To support K8s NetworkPolicy c.EnableK8sNetworkPolicy = vp.GetBool(EnableK8sNetworkPolicy) + + c.LoadBalancerProtocolDifferentiation = vp.GetBool(LoadBalancerProtocolDifferentiation) } func (c *DaemonConfig) populateDevices(vp *viper.Viper) { diff --git a/pkg/service/id_local.go b/pkg/service/id_local.go index 02a5c10b8ef11..6e9c519659b6b 100644 --- a/pkg/service/id_local.go +++ b/pkg/service/id_local.go @@ -51,6 +51,7 @@ func NewIDAllocator(nextID uint32, maxID uint32) *IDAllocator { } } +// addID assumes the lock is held func (alloc *IDAllocator) addID(svc loadbalancer.L3n4Addr, id uint32) *loadbalancer.L3n4AddrID { svcID := newID(svc, id) alloc.entitiesID[id] = svcID diff --git a/pkg/service/service.go b/pkg/service/service.go index b9458b222d4e9..b0b552bd65904 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -4,6 +4,7 @@ package service import ( + "errors" "fmt" "net" "strconv" @@ -32,6 +33,7 @@ import ( nodeTypes "github.com/cilium/cilium/pkg/node/types" "github.com/cilium/cilium/pkg/option" "github.com/cilium/cilium/pkg/service/healthserver" + "github.com/cilium/cilium/pkg/u8proto" ) const anyPort = "*" @@ -818,7 +820,7 @@ func (s *Service) UpdateBackendsState(backends []*lb.Backend) error { } be.State = updatedB.State be.Preferred = updatedB.Preferred - + nextService: for id, info := range s.svcByID { var p *datapathTypes.UpsertServiceParams for i, b := range info.backends { @@ -833,10 +835,17 @@ func (s *Service) UpdateBackendsState(backends []*lb.Backend) error { found := false if p, found = updateSvcs[id]; !found { + proto, err := u8proto.ParseProtocol(info.frontend.L4Addr.Protocol) + if err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to parse service protocol for frontend %+v: %w", info.frontend, err)) + continue nextService + } + p = &datapathTypes.UpsertServiceParams{ ID: uint16(id), IP: info.frontend.L3n4Addr.AddrCluster.AsNetIP(), Port: info.frontend.L3n4Addr.L4Addr.Port, + Protocol: byte(proto), PrevBackendsCount: len(info.backends), IPv6: info.frontend.IsIPv6(), Type: info.svcType, @@ -1142,8 +1151,36 @@ func (s *Service) createSVCInfoIfNotExist(p *lb.SVC) (*svcInfo, bool, bool, prevSessionAffinity := false prevLoadBalancerSourceRanges := []*cidr.CIDR{} + // when Cilium is upgraded to a version that supports service protocol differentiation, and such feature is + // enabled, we may end up in a situation where some existing services do not have the protocol set. + // + // As in such cases we want to preserve the existing services (in order to not break existing connections to + // those services), when trying to create a new one check first if an "old" service without the protocol + // already exists, by overwriting its protocol to NONE. + // If it doesn't then do a second lookup in the svcByHash map with the protocol set. + // + // Note that this logic can be removed once we stop supporting services without protocol. + proto := p.Frontend.L3n4Addr.L4Addr.Protocol + p.Frontend.L3n4Addr.L4Addr.Protocol = "NONE" + + backendProtos := []lb.L4Type{} + for _, backend := range p.Backends { + backendProtos = append(backendProtos, backend.L3n4Addr.L4Addr.Protocol) + backend.L3n4Addr.L4Addr.Protocol = "NONE" + } + hash := p.Frontend.Hash() svc, found := s.svcByHash[hash] + if !found { + p.Frontend.L3n4Addr.L4Addr.Protocol = proto + for i, backend := range p.Backends { + backend.L3n4Addr.L4Addr.Protocol = backendProtos[i] + } + + hash = p.Frontend.Hash() + svc, found = s.svcByHash[hash] + } + if !found { // Allocate service ID for the new service addrID, err := AcquireID(p.Frontend.L3n4Addr, uint32(p.Frontend.ID)) @@ -1357,11 +1394,16 @@ func (s *Service) upsertServiceIntoLBMaps(svc *svcInfo, isExtLocal, isIntLocal b } } svc.svcNatPolicy = natPolicy + protocol, err := u8proto.ParseProtocol(svc.frontend.L3n4Addr.L4Addr.Protocol) + if err != nil { + return err + } p := &datapathTypes.UpsertServiceParams{ ID: uint16(svc.frontend.ID), IP: svc.frontend.L3n4Addr.AddrCluster.AsNetIP(), Port: svc.frontend.L3n4Addr.L4Addr.Port, + Protocol: uint8(protocol), PreferredBackends: preferredBackends, ActiveBackends: activeBackends, NonActiveBackends: nonActiveBackends, diff --git a/pkg/testutils/mockmaps/lbmap.go b/pkg/testutils/mockmaps/lbmap.go index e2e8feeb1b06e..077a4d98e30c5 100644 --- a/pkg/testutils/mockmaps/lbmap.go +++ b/pkg/testutils/mockmaps/lbmap.go @@ -12,6 +12,7 @@ import ( "github.com/cilium/cilium/pkg/ip" lb "github.com/cilium/cilium/pkg/loadbalancer" "github.com/cilium/cilium/pkg/lock" + "github.com/cilium/cilium/pkg/u8proto" ) type LBMockMap struct { @@ -59,7 +60,11 @@ func (m *LBMockMap) UpsertService(p *datapathTypes.UpsertServiceParams) error { } svc, found := m.ServiceByID[p.ID] if !found { - frontend := lb.NewL3n4AddrID(lb.NONE, cmtypes.MustAddrClusterFromIP(p.IP), p.Port, p.Scope, lb.ID(p.ID)) + u8p, err := u8proto.FromNumber(p.Protocol) + if err != nil { + return err + } + frontend := lb.NewL3n4AddrID(u8p.String(), cmtypes.MustAddrClusterFromIP(p.IP), p.Port, p.Scope, lb.ID(p.ID)) svc = &lb.SVC{Frontend: *frontend} } else { if p.PrevBackendsCount != len(svc.Backends) { @@ -137,7 +142,7 @@ func (m *LBMockMap) UpdateBackendWithState(b *lb.Backend) error { if !found { return fmt.Errorf("update failed : backend %d doesn't exist", id) } - if b.ID != be.ID || b.Port != be.Port || !b.AddrCluster.Equal(be.AddrCluster) { + if b.ID != be.ID || b.Port != be.Port || b.Protocol != be.Protocol || !b.AddrCluster.Equal(be.AddrCluster) { return fmt.Errorf("backend in the map %+v doesn't match %+v: only backend"+ "state can be updated", be.String(), b.String()) } diff --git a/pkg/u8proto/u8proto.go b/pkg/u8proto/u8proto.go index 2df035a044173..988d11e077266 100644 --- a/pkg/u8proto/u8proto.go +++ b/pkg/u8proto/u8proto.go @@ -34,6 +34,7 @@ var protoNames = map[U8proto]string{ var ProtoIDs = map[string]U8proto{ "all": 0, "any": 0, + "none": 0, "icmp": 1, "tcp": 6, "udp": 17, @@ -56,3 +57,11 @@ func ParseProtocol(proto string) (U8proto, error) { } return 0, fmt.Errorf("unknown protocol '%s'", proto) } + +func FromNumber(proto uint8) (U8proto, error) { + _, ok := protoNames[U8proto(proto)] + if !ok { + return 0, fmt.Errorf("unknown protocol %d", proto) + } + return U8proto(proto), nil +}