Skip to content

Commit

Permalink
project/gateway2: Replaces Endpoints with EndpointSlice for Creating …
Browse files Browse the repository at this point in the history
…Upstream Endpoints (#10265)

Signed-off-by: Daneyon Hansen <[email protected]>
Co-authored-by: Yuval Kohavi <[email protected]>
  • Loading branch information
danehans and yuval-k authored Nov 12, 2024
1 parent 2d5e231 commit a93178d
Show file tree
Hide file tree
Showing 13 changed files with 577 additions and 215 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1242,9 +1242,8 @@ $(TEST_ASSET_DIR)/conformance/conformance_test.go:

CONFORMANCE_SUPPORTED_FEATURES ?= -supported-features=Gateway,ReferenceGrant,HTTPRoute,HTTPRouteQueryParamMatching,HTTPRouteMethodMatching,HTTPRouteResponseHeaderModification,HTTPRoutePortRedirect,HTTPRouteHostRewrite,HTTPRouteSchemeRedirect,HTTPRoutePathRedirect,HTTPRouteHostRewrite,HTTPRoutePathRewrite,HTTPRouteRequestMirror
CONFORMANCE_SUPPORTED_PROFILES ?= -conformance-profiles=GATEWAY-HTTP
CONFORMANCE_SKIP_TESTS ?= -skip-tests=HTTPRouteServiceTypes
CONFORMANCE_REPORT_ARGS ?= -report-output=$(TEST_ASSET_DIR)/conformance/$(VERSION)-report.yaml -organization=solo.io -project=gloo-gateway -version=$(VERSION) -url=github.com/solo-io/gloo -contact=github.com/solo-io/gloo/issues/new/choose
CONFORMANCE_ARGS := -gateway-class=gloo-gateway $(CONFORMANCE_SKIP_TESTS) $(CONFORMANCE_SUPPORTED_FEATURES) $(CONFORMANCE_SUPPORTED_PROFILES) $(CONFORMANCE_REPORT_ARGS)
CONFORMANCE_ARGS := -gateway-class=gloo-gateway $(CONFORMANCE_SUPPORTED_FEATURES) $(CONFORMANCE_SUPPORTED_PROFILES) $(CONFORMANCE_REPORT_ARGS)

.PHONY: conformance ## Run the conformance test suite
conformance: $(TEST_ASSET_DIR)/conformance/conformance_test.go
Expand Down
6 changes: 6 additions & 0 deletions changelog/v1.18.0-beta34/issue_6910.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
changelog:
- type: FIX
issueLink: https://github.com/solo-io/solo-projects/issues/6910
resolvesIssue: true
description: >-
Updates the gateway2 project to use EndpointSlice instead of Endpoints for creating upstream endpoints.
2 changes: 2 additions & 0 deletions pkg/schemes/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package schemes
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
Expand All @@ -28,6 +29,7 @@ var SchemeBuilder = runtime.SchemeBuilder{
// Kubernetes Core resources
corev1.AddToScheme,
appsv1.AddToScheme,
discoveryv1.AddToScheme,

// Register the apiextensions API group
apiextensionsv1.AddToScheme,
Expand Down
11 changes: 6 additions & 5 deletions projects/gateway2/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewBaseGatewayController(ctx context.Context, cfg GatewayConfig) error {
controllerBuilder.watchVirtualHostOptions,
controllerBuilder.watchUpstreams,
controllerBuilder.watchServices,
controllerBuilder.watchEndpoints,
controllerBuilder.watchEndpointSlices,
controllerBuilder.watchPods,
controllerBuilder.watchSecrets,
controllerBuilder.addIndexes,
Expand Down Expand Up @@ -372,10 +373,10 @@ func (c *controllerBuilder) watchPods(ctx context.Context) error {
Complete(reconcile.Func(c.reconciler.ReconcilePods))
}

func (c *controllerBuilder) watchEndpoints(ctx context.Context) error {
func (c *controllerBuilder) watchEndpointSlices(ctx context.Context) error {
return ctrl.NewControllerManagedBy(c.cfg.Mgr).
For(&corev1.Endpoints{}).
Complete(reconcile.Func(c.reconciler.ReconcileEndpoints))
For(&discoveryv1.EndpointSlice{}).
Complete(reconcile.Func(c.reconciler.ReconcileEndpointSlices))
}

func (c *controllerBuilder) watchSecrets(ctx context.Context) error {
Expand Down Expand Up @@ -445,7 +446,7 @@ func (r *controllerReconciler) ReconcilePods(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, nil
}

func (r *controllerReconciler) ReconcileEndpoints(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *controllerReconciler) ReconcileEndpointSlices(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// eventually reconcile only effected listeners etc
r.kick(ctx)
return ctrl.Result{}, nil
Expand Down
170 changes: 108 additions & 62 deletions projects/gateway2/krtcollections/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/krt"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
)

Expand All @@ -48,11 +49,12 @@ func (p EndpointsSettings) ResourceName() string {
}

type EndpointsInputs struct {
Upstreams krt.Collection[UpstreamWrapper]
Endpoints krt.Collection[*corev1.Endpoints]
Pods krt.Collection[LocalityPod]
EndpointsSettings krt.Singleton[EndpointsSettings]
Services krt.Collection[*corev1.Service]
Upstreams krt.Collection[UpstreamWrapper]
EndpointSlices krt.Collection[*discoveryv1.EndpointSlice]
EndpointSlicesByService krt.Index[types.NamespacedName, *discoveryv1.EndpointSlice]
Pods krt.Collection[LocalityPod]
EndpointsSettings krt.Singleton[EndpointsSettings]
Services krt.Collection[*corev1.Service]

Debugger *krt.DebugHandler
}
Expand All @@ -66,22 +68,35 @@ func NewGlooK8sEndpointInputs(
finalUpstreams krt.Collection[UpstreamWrapper],
) EndpointsInputs {
withDebug := krt.WithDebugging(dbg)
epClient := kclient.New[*corev1.Endpoints](istioClient)
kubeEndpoints := krt.WrapClient(epClient, krt.WithName("Endpoints"), withDebug)
epSliceClient := kclient.New[*discoveryv1.EndpointSlice](istioClient)
endpointSlices := krt.WrapClient(epSliceClient, krt.WithName("EndpointSlices"), withDebug)
endpointSettings := krt.NewSingleton(func(ctx krt.HandlerContext) *EndpointsSettings {
settings := krt.FetchOne(ctx, settings.AsCollection())
return &EndpointsSettings{
EnableAutoMtls: settings.Spec.GetGloo().GetIstioOptions().GetEnableAutoMtls().GetValue(),
}
}, withDebug)

// Create index on EndpointSlices by service name and endpointslice namespace
endpointSlicesByService := krt.NewIndex(endpointSlices, func(es *discoveryv1.EndpointSlice) []types.NamespacedName {
svcName, ok := es.Labels[discoveryv1.LabelServiceName]
if !ok {
return nil
}
return []types.NamespacedName{{
Namespace: es.Namespace,
Name: svcName,
}}
})

return EndpointsInputs{
Upstreams: finalUpstreams,
Endpoints: kubeEndpoints,
Pods: pods,
EndpointsSettings: endpointSettings,
Services: services,
Debugger: dbg,
Upstreams: finalUpstreams,
EndpointSlices: endpointSlices,
EndpointSlicesByService: endpointSlicesByService,
Pods: pods,
EndpointsSettings: endpointSettings,
Services: services,
Debugger: dbg,
}
}

Expand Down Expand Up @@ -169,8 +184,7 @@ func NewGlooK8sEndpoints(ctx context.Context, inputs EndpointsInputs) krt.Collec

func transformK8sEndpoints(ctx context.Context, inputs EndpointsInputs) func(kctx krt.HandlerContext, us UpstreamWrapper) *EndpointsForUpstream {
augmentedPods := inputs.Pods
kubeEndpoints := inputs.Endpoints
services := inputs.Services
svcs := inputs.Services

logger := contextutils.LoggerFrom(ctx).Desugar()

Expand All @@ -192,66 +206,95 @@ func transformK8sEndpoints(ctx context.Context, inputs EndpointsInputs) func(kct
return nil
}
spec := kubeUpstream.Kube
kubeServicePort, singlePortService := findPortForService(kctx, services, spec)
if kubeServicePort == nil {
kubeSvcPort, singlePortSvc := findPortForService(kctx, svcs, spec)
if kubeSvcPort == nil {
logger.Debug("findPortForService - not found.", zap.Uint32("port", spec.GetServicePort()), zap.String("svcName", spec.GetServiceName()), zap.String("svcNamespace", spec.GetServiceNamespace()))
return nil
}

maybeEps := krt.FetchOne(kctx, kubeEndpoints, krt.FilterObjectName(types.NamespacedName{
Namespace: spec.GetServiceNamespace(),
Name: spec.GetServiceName(),
}))
if maybeEps == nil {
warnsToLog = append(warnsToLog, fmt.Sprintf("endpoints not found for service %v", spec.GetServiceName()))
logger.Debug("endpoints not found for service")
svcNs := spec.GetServiceNamespace()
svcName := spec.GetServiceName()
// Fetch all EndpointSlices for the service
key := types.NamespacedName{
Namespace: svcNs,
Name: svcName,
}

endpointSlices := krt.Fetch(kctx, inputs.EndpointSlices, krt.FilterIndex(inputs.EndpointSlicesByService, key))
if len(endpointSlices) == 0 {
warnsToLog = append(warnsToLog, fmt.Sprintf("EndpointSlices not found for service %v/%v", svcNs, svcName))
return nil
}

if len(endpointSlices) == 0 {
warnsToLog = append(warnsToLog, fmt.Sprintf("EndpointSlices not found for service %v/%v", svcNs, svcName))
logger.Debug("EndpointSlices not found for service")
return nil
}
eps := *maybeEps

// Initialize the returned EndpointsForUpstream
settings := krt.FetchOne(kctx, inputs.EndpointsSettings.AsCollection())
enableAutoMtls := settings.EnableAutoMtls
ret := NewEndpointsForUpstream(us, logger)
for _, subset := range eps.Subsets {
port := findFirstPortInEndpointSubsets(subset, singlePortService, kubeServicePort)

// Handle deduplication of endpoint addresses
seenAddresses := make(map[string]struct{})

// Add an endpoint to the returned EndpointsForUpstream for each EndpointSlice
for _, endpointSlice := range endpointSlices {
port := findPortInEndpointSlice(endpointSlice, singlePortSvc, kubeSvcPort)
if port == 0 {
warnsToLog = append(warnsToLog, fmt.Sprintf("port not found (%v) for service %v in endpoint %v", spec.GetServicePort(), spec.GetServiceName(), subset))
warnsToLog = append(warnsToLog, fmt.Sprintf("port %v not found for service %v/%v in EndpointSlice %v",
spec.GetServicePort(), svcNs, svcName, endpointSlice.Name))
continue
}

for _, addr := range subset.Addresses {
var podName string
podNamespace := eps.Namespace
targetRef := addr.TargetRef
if targetRef != nil {
if targetRef.Kind == "Pod" {
podName = targetRef.Name
if targetRef.Namespace != "" {
podNamespace = targetRef.Namespace
for _, endpoint := range endpointSlice.Endpoints {
// Skip endpoints that are not ready
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
continue
}
// Get the addresses
for _, addr := range endpoint.Addresses {
// Deduplicate addresses
if _, exists := seenAddresses[addr]; exists {
continue
}
seenAddresses[addr] = struct{}{}

var podName string
podNamespace := endpointSlice.Namespace
targetRef := endpoint.TargetRef
if targetRef != nil {
if targetRef.Kind == "Pod" {
podName = targetRef.Name
if targetRef.Namespace != "" {
podNamespace = targetRef.Namespace
}
}
}
}

var augmentedLabels map[string]string
var l PodLocality
if podName != "" {
maybePod := krt.FetchOne(kctx, augmentedPods, krt.FilterObjectName(types.NamespacedName{
Namespace: podNamespace,
Name: podName,
}))
if maybePod != nil {
l = maybePod.Locality
augmentedLabels = maybePod.AugmentedLabels
var augmentedLabels map[string]string
var l PodLocality
if podName != "" {
maybePod := krt.FetchOne(kctx, augmentedPods, krt.FilterObjectName(types.NamespacedName{
Namespace: podNamespace,
Name: podName,
}))
if maybePod != nil {
l = maybePod.Locality
augmentedLabels = maybePod.AugmentedLabels
}
}
}
ep := CreateLBEndpoint(addr.IP, port, augmentedLabels, enableAutoMtls)
ep := CreateLBEndpoint(addr, port, augmentedLabels, enableAutoMtls)

ret.Add(l, EndpointWithMd{
LbEndpoint: ep,
EndpointMd: EndpointMetadata{
Labels: augmentedLabels,
},
})
ret.Add(l, EndpointWithMd{
LbEndpoint: ep,
EndpointMd: EndpointMetadata{
Labels: augmentedLabels,
},
})
}
}
}
logger.Debug("created endpoint", zap.Int("numAddresses", len(ret.LbEps)))
Expand Down Expand Up @@ -333,16 +376,19 @@ func findPortForService(kctx krt.HandlerContext, services krt.Collection[*corev1
return nil, false
}

func findFirstPortInEndpointSubsets(subset corev1.EndpointSubset, singlePortService bool, kubeServicePort *corev1.ServicePort) uint32 {
func findPortInEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, singlePortService bool, kubeServicePort *corev1.ServicePort) uint32 {
var port uint32
for _, p := range subset.Ports {
// if the endpoint port is not named, it implies that
for _, p := range endpointSlice.Ports {
if p.Port == nil {
continue
}
// If the endpoint port is not named, it implies that
// the kube service only has a single unnamed port as well.
switch {
case singlePortService:
port = uint32(p.Port)
case p.Name == kubeServicePort.Name:
port = uint32(p.Port)
port = uint32(*p.Port)
case p.Name != nil && *p.Name == kubeServicePort.Name:
port = uint32(*p.Port)
break
}
}
Expand Down
Loading

0 comments on commit a93178d

Please sign in to comment.