Skip to content

Commit

Permalink
feature(l4): Add zonal affinity support and related controls
Browse files Browse the repository at this point in the history
This commit implements zonal affinity capabilities for L4 Internal Load Balancers:

- Add zonal affinity flag for L4 ILB controllers
- Implement backendService zonal affinity control logic
- Add backendService minimum API version control
- Update backendService tests to include zonal affinity cases with expected NetworkPassThroughLbTrafficPolicy
- Add handler for service spec Spec.TrafficDistribution change events
- Add L4 TestZonalAffinity integration tests
  • Loading branch information
08volt committed Feb 26, 2025
1 parent f683eee commit f23b7b3
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func main() {
EnableL4NetLBNEGsDefault: flags.F.EnableL4NetLBNEGDefault,
EnableL4ILBMixedProtocol: flags.F.EnableL4ILBMixedProtocol,
EnableL4NetLBMixedProtocol: flags.F.EnableL4NetLBMixedProtocol,
EnableZonalAffinity: flags.F.EnableZonalAffinity,
}
ctx, err := ingctx.NewControllerContext(kubeClient, backendConfigClient, frontendConfigClient, firewallCRClient, svcNegClient, svcAttachmentClient, networkClient, nodeTopologyClient, eventRecorderKubeClient, cloud, namer, kubeSystemUID, ctxConfig, rootLogger)
if err != nil {
Expand Down
98 changes: 94 additions & 4 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
DefaultConnectionDrainingTimeoutSeconds = 30
defaultTrackingMode = "PER_CONNECTION"
PerSessionTrackingMode = "PER_SESSION" // the only one supported with strong session affinity
DefaultZonalAffinitySpillover = "ZONAL_AFFINITY_SPILL_CROSS_ZONE"
DefaultZonalAffinitySpilloverRatio = 0
ZonalAffinityDisabledSpillover = "ZONAL_AFFINITY_DISABLED"
)

// LocalityLBPolicyType is the type of locality lb policy the backend service should use.
Expand Down Expand Up @@ -90,6 +93,31 @@ type L4BackendServiceParams struct {
NetworkInfo *network.NetworkInfo
ConnectionTrackingPolicy *composite.BackendServiceConnectionTrackingPolicy
LocalityLbPolicy LocalityLBPolicyType
EnableZonalAffinity bool
}

var versionPrecedence = map[meta.Version]int{
meta.VersionAlpha: 2,
meta.VersionBeta: 1,
meta.VersionGA: 0,
}

// maxVersion returns the higher version based on precedence.
func maxVersion(a, b meta.Version) meta.Version {
precedenceA, okA := versionPrecedence[a]
if !okA {
precedenceA = 0 // Use VersionGA precedence if invalid.
}

precedenceB, okB := versionPrecedence[b]
if !okB {
precedenceB = 0 // Use VersionGA precedence if invalid.
}

if precedenceA > precedenceB {
return a
}
return b
}

// ensureDescription updates the BackendService Description with the expected value
Expand Down Expand Up @@ -327,6 +355,14 @@ func (p *Pool) DeleteSignedURLKey(be *composite.BackendService, keyName string,
return nil
}

// minRequiredVersion to create a backend service with the given params
func minRequiredVersion(params L4BackendServiceParams) meta.Version {
if params.EnableZonalAffinity {
return meta.VersionAlpha
}
return meta.VersionGA
}

// EnsureL4BackendService creates or updates the backend service with the given name.
func (p *Pool) EnsureL4BackendService(params L4BackendServiceParams, beLogger klog.Logger) (*composite.BackendService, utils.ResourceSyncStatus, error) {
start := time.Now()
Expand All @@ -342,25 +378,34 @@ func (p *Pool) EnsureL4BackendService(params L4BackendServiceParams, beLogger kl
if err != nil {
return nil, utils.ResourceResync, err
}

currentBS, err := composite.GetBackendService(p.cloud, key, meta.VersionGA, beLogger)
if err != nil && !utils.IsNotFoundError(err) {
return nil, utils.ResourceResync, err
}
desc, err := utils.MakeL4LBServiceDescription(params.NamespacedName.String(), "", meta.VersionGA, false, utils.ILB)

expectedVersion := minRequiredVersion(params)
expectedDesc, err := utils.MakeL4LBServiceDescription(params.NamespacedName.String(), "", expectedVersion, false, utils.ILB)
if err != nil {
beLogger.Info("EnsureL4BackendService: Failed to generate description for BackendService", "err", err)
}

expectedBS := &composite.BackendService{
Name: params.Name,
Protocol: params.Protocol,
Description: desc,
Version: expectedVersion,
Description: expectedDesc,
HealthChecks: []string{params.HealthCheckLink},
SessionAffinity: utils.TranslateAffinityType(params.SessionAffinity, beLogger),
LoadBalancingScheme: params.Scheme,
LocalityLbPolicy: string(params.LocalityLbPolicy),
}

if params.EnableZonalAffinity {
beLogger.V(2).Info("EnsureL4BackendService: using Zonal Affinity", "spillover", DefaultZonalAffinitySpillover, "spilloverRatio", DefaultZonalAffinitySpilloverRatio)
expectedBS.NetworkPassThroughLbTrafficPolicy = defaultZonalAffinityTrafficPolicy()
}

// We need this configuration only for Strong Session Affinity feature
if p.useConnectionTrackingPolicy {
beLogger.V(2).Info(fmt.Sprintf("EnsureL4BackendService: using connection tracking policy: %+v", params.ConnectionTrackingPolicy))
Expand Down Expand Up @@ -388,7 +433,7 @@ func (p *Pool) EnsureL4BackendService(params L4BackendServiceParams, beLogger kl
// We need to perform a GCE call to re-fetch the object we just created
// so that the "Fingerprint" field is filled in. This is needed to update the
// object without error. The lookup is also needed to populate the selfLink.
createdBS, err := composite.GetBackendService(p.cloud, key, meta.VersionGA, beLogger)
createdBS, err := composite.GetBackendService(p.cloud, key, expectedBS.Version, beLogger)
return createdBS, utils.ResourceUpdate, err
} else {
// TODO(FelipeYepez) remove this check once LocalityLBPolicyMaglev does not require allow lisiting
Expand All @@ -398,6 +443,17 @@ func (p *Pool) EnsureL4BackendService(params L4BackendServiceParams, beLogger kl

expectedBS.LocalityLbPolicy = string(LocalityLBPolicyMaglev)
}

// Use the version with most priority if the BackendService was already using one
currentVersion := meta.VersionGA
var currentDesc utils.L4LBResourceDescription
err = currentDesc.Unmarshal(currentBS.Description)
if err != nil {
beLogger.V(0).Error(err, "EnsureL4BackendService: error unmarshaling backend service description")
} else {
currentVersion = currentDesc.APIVersion
}
expectedBS.Version = maxVersion(currentVersion, expectedBS.Version)
}

if backendSvcEqual(expectedBS, currentBS, p.useConnectionTrackingPolicy) {
Expand All @@ -418,7 +474,7 @@ func (p *Pool) EnsureL4BackendService(params L4BackendServiceParams, beLogger kl
}
beLogger.V(2).Info("EnsureL4BackendService: updated backend service successfully")

updatedBS, err := composite.GetBackendService(p.cloud, key, meta.VersionGA, beLogger)
updatedBS, err := composite.GetBackendService(p.cloud, key, expectedBS.Version, beLogger)
return updatedBS, utils.ResourceUpdate, err
}

Expand Down Expand Up @@ -447,9 +503,34 @@ func backendSvcEqual(newBS, oldBS *composite.BackendService, compareConnectionTr
(newBS.LocalityLbPolicy == string(LocalityLBPolicyDefault) && oldBS.LocalityLbPolicy == string(LocalityLBPolicyMaglev)) ||
(newBS.LocalityLbPolicy == string(LocalityLBPolicyMaglev) && oldBS.LocalityLbPolicy == string(LocalityLBPolicyDefault)))

// If zonal affinity is set, needs to be equal
svcsEqual = svcsEqual && zonalAffinityEqual(newBS, oldBS)
return svcsEqual
}

func convertNetworkLbTrafficPolicyToZonalAffinity(trafficPolicy *composite.BackendServiceNetworkPassThroughLbTrafficPolicy) composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity {
if trafficPolicy == nil || trafficPolicy.ZonalAffinity == nil {
return composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: ZonalAffinityDisabledSpillover,
SpilloverRatio: 0,
}
}

return *trafficPolicy.ZonalAffinity
}

func zonalAffinityEqual(a, b *composite.BackendService) bool {
aZonalAffinity := convertNetworkLbTrafficPolicyToZonalAffinity(a.NetworkPassThroughLbTrafficPolicy)
bZonalAffinity := convertNetworkLbTrafficPolicyToZonalAffinity(b.NetworkPassThroughLbTrafficPolicy)

// Compare Spillover values
spilloverEqual := aZonalAffinity.Spillover == bZonalAffinity.Spillover
// Compare SpilloverRatio values
spilloverRatioEqual := aZonalAffinity.SpilloverRatio == bZonalAffinity.SpilloverRatio

return spilloverEqual && spilloverRatioEqual
}

// connectionTrackingPolicyEqual returns true if both elements are equal
// and return false if at least one parameter is different
func connectionTrackingPolicyEqual(a, b *composite.BackendServiceConnectionTrackingPolicy) bool {
Expand All @@ -460,3 +541,12 @@ func connectionTrackingPolicyEqual(a, b *composite.BackendServiceConnectionTrack
a.EnableStrongAffinity == b.EnableStrongAffinity &&
a.IdleTimeoutSec == b.IdleTimeoutSec
}

func defaultZonalAffinityTrafficPolicy() *composite.BackendServiceNetworkPassThroughLbTrafficPolicy {
return &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: DefaultZonalAffinitySpillover,
SpilloverRatio: DefaultZonalAffinitySpilloverRatio,
},
}
}
99 changes: 99 additions & 0 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,105 @@ func TestBackendSvcEqual(t *testing.T) {
},
wantEqual: false,
},
{
desc: "Test existing backend service diff with zonal affinity feature enabled",
oldBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_SPILL_CROSS_ZONE",
SpilloverRatio: 0.7,
},
},
},
newBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_SPILL_CROSS_ZONE",
SpilloverRatio: 0.7,
},
},
},
wantEqual: true,
},
{
desc: "Test existing backend service diff with zonal affinity feature enabled but different ratio",
oldBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_SPILL_CROSS_ZONE",
SpilloverRatio: 0.7,
},
},
},
newBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_SPILL_CROSS_ZONE",
SpilloverRatio: 0.3,
},
},
},
wantEqual: false,
},
{
desc: "Test existing backend service diff with zonal affinity feature enabled but different spillover strategy",
oldBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_STAY_WITHIN_ZONE",
SpilloverRatio: 0.3,
},
},
},
newBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_SPILL_CROSS_ZONE",
SpilloverRatio: 0.3,
},
},
},
wantEqual: false,
},
{
desc: "Test existing backend service diff enabling zonal affinity feature",
oldBackendService: &composite.BackendService{},
newBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_SPILL_CROSS_ZONE",
SpilloverRatio: 0.3,
},
},
},
wantEqual: false,
},
{
desc: "Test existing backend service diff enabling zonal affinity feature",
oldBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_SPILL_CROSS_ZONE",
SpilloverRatio: 0.3,
},
},
},
newBackendService: &composite.BackendService{},
wantEqual: false,
},
{
desc: "Test existing backend service diff with zonal affinity disables",
oldBackendService: &composite.BackendService{
NetworkPassThroughLbTrafficPolicy: &composite.BackendServiceNetworkPassThroughLbTrafficPolicy{
ZonalAffinity: &composite.BackendServiceNetworkPassThroughLbTrafficPolicyZonalAffinity{
Spillover: "ZONAL_AFFINITY_DISABLED",
SpilloverRatio: 0,
},
},
},
newBackendService: &composite.BackendService{},
wantEqual: true,
},
} {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type ControllerContextConfig struct {
EnableIngressRegionalExternal bool
EnableWeightedL4ILB bool
EnableWeightedL4NetLB bool
EnableZonalAffinity bool
DisableL4LBFirewall bool
EnableL4NetLBNEGs bool
EnableL4NetLBNEGsDefault bool
Expand Down
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var F = struct {
EnableWeightedL4NetLB bool
EnableDiscretePortForwarding bool
EnableMultiProjectMode bool
EnableZonalAffinity bool
ProviderConfigNameLabelKey string
EnableL4ILBMixedProtocol bool
EnableL4NetLBMixedProtocol bool
Expand Down Expand Up @@ -326,6 +327,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.StringVar(&F.NodeTopologyCRName, "node-topology-cr-name", "default", "The name of the Node Topology CR.")
flag.BoolVar(&F.EnableWeightedL4ILB, "enable-weighted-l4-ilb", false, "Enable Weighted Load balancing for L4 ILB.")
flag.BoolVar(&F.EnableWeightedL4NetLB, "enable-weighted-l4-netlb", false, "EnableWeighted Load balancing for L4 NetLB .")
flag.BoolVar(&F.EnableZonalAffinity, "enable-zonal-affinity", false, "Enable Zonal Affinity for L4 ILB.")
flag.Float32Var(&F.KubeClientQPS, "kube-client-qps", 0.0, "The QPS that the controllers' kube client should adhere to through client side throttling. If zero, client will be created with default settings.")
flag.IntVar(&F.KubeClientBurst, "kube-client-burst", 0, "The burst QPS that the controllers' kube client should adhere to through client side throttling. If zero, client will be created with default settings.")
flag.BoolVar(&F.EnableDiscretePortForwarding, "enable-discrete-port-forwarding", false, "Enable forwarding of individual ports instead of port ranges.")
Expand Down
7 changes: 7 additions & 0 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service, svcLo
EnableWeightedLB: l4c.ctx.EnableWeightedL4ILB,
DisableNodesFirewallProvisioning: l4c.ctx.DisableL4LBFirewall,
EnableMixedProtocol: l4c.ctx.EnableL4ILBMixedProtocol,
EnableZonalAffinity: l4c.ctx.EnableZonalAffinity,
}
l4 := loadbalancers.NewL4Handler(l4ilbParams, svcLogger)
syncResult := l4.EnsureInternalLoadBalancer(utils.GetNodeNames(nodes), service)
Expand Down Expand Up @@ -369,6 +370,7 @@ func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service, svc
EnableWeightedLB: l4c.ctx.EnableWeightedL4ILB,
DisableNodesFirewallProvisioning: l4c.ctx.DisableL4LBFirewall,
EnableMixedProtocol: l4c.ctx.EnableL4ILBMixedProtocol,
EnableZonalAffinity: l4c.ctx.EnableZonalAffinity,
}
l4 := loadbalancers.NewL4Handler(l4ilbParams, svcLogger)
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key)
Expand Down Expand Up @@ -579,6 +581,11 @@ func (l4c *L4Controller) needsUpdate(oldService *v1.Service, newService *v1.Serv
oldService.Spec.HealthCheckNodePort, newService.Spec.HealthCheckNodePort)
return true
}
if oldService.Spec.TrafficDistribution != newService.Spec.TrafficDistribution {
recorder.Eventf(newService, v1.EventTypeNormal, "TrafficDistribution", "%v -> %v",
oldService.Spec.TrafficDistribution, newService.Spec.TrafficDistribution)
return true
}
if l4c.enableDualStack && !reflect.DeepEqual(oldService.Spec.IPFamilies, newService.Spec.IPFamilies) {
recorder.Eventf(newService, v1.EventTypeNormal, "IPFamilies", "%v -> %v",
oldService.Spec.IPFamilies, newService.Spec.IPFamilies)
Expand Down
Loading

0 comments on commit f23b7b3

Please sign in to comment.