Skip to content

Commit

Permalink
Merge branch 'master' into helpers-removed-v4
Browse files Browse the repository at this point in the history
  • Loading branch information
08volt committed Jun 3, 2024
2 parents 4626503 + ff39946 commit 72b74a5
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 151 deletions.
8 changes: 4 additions & 4 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type ControllerContext struct {

recorderLock sync.Mutex
// Map of namespace => record.EventRecorder.
recorders map[string]record.EventRecorder
Recorders map[string]record.EventRecorder

// NOTE: If the flag GKEClusterType is empty, then cluster will default to zonal. This field should not be used for
// controller logic and should only be used for providing additional information to the user.
Expand Down Expand Up @@ -192,7 +192,7 @@ func NewControllerContext(
PodInformer: podInformer,
NodeInformer: nodeInformer,
SvcNegInformer: informersvcneg.NewServiceNetworkEndpointGroupInformer(svcnegClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()),
recorders: map[string]record.EventRecorder{},
Recorders: map[string]record.EventRecorder{},
healthChecks: make(map[string]func() error),
logger: logger,
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func (ctx *ControllerContext) HasSynced() bool {
func (ctx *ControllerContext) Recorder(ns string) record.EventRecorder {
ctx.recorderLock.Lock()
defer ctx.recorderLock.Unlock()
if rec, ok := ctx.recorders[ns]; ok {
if rec, ok := ctx.Recorders[ns]; ok {
return rec
}

Expand All @@ -349,7 +349,7 @@ func (ctx *ControllerContext) Recorder(ns string) record.EventRecorder {
Interface: ctx.KubeClient.CoreV1().Events(ns),
})
rec := broadcaster.NewRecorder(ctx.generateScheme(), apiv1.EventSource{Component: "loadbalancer-controller"})
ctx.recorders[ns] = rec
ctx.Recorders[ns] = rec

return rec
}
Expand Down
56 changes: 6 additions & 50 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math/rand"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -254,7 +253,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service, svcLo

// Ensure v2 finalizer
if err := common.EnsureServiceFinalizer(service, common.ILBFinalizerV2, l4c.ctx.KubeClient, svcLogger); err != nil {
return &loadbalancers.L4ILBSyncResult{Error: fmt.Errorf("Failed to attach finalizer to service %s/%s, err %w", service.Namespace, service.Name, err)}
return &loadbalancers.L4ILBSyncResult{Error: fmt.Errorf("failed to attach finalizer to service %s/%s, err %w", service.Namespace, service.Name, err)}
}
nodes, err := l4c.zoneGetter.ListNodes(zonegetter.CandidateNodesFilter, svcLogger)
if err != nil {
Expand Down Expand Up @@ -297,43 +296,16 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service, svcLo
syncResult.Error = err
return syncResult
}
err = updateServiceStatus(l4c.ctx, service, syncResult.Status, svcLogger)
err = updateServiceInformation(l4c.ctx, l4c.enableDualStack, service, syncResult.Status, syncResult.Annotations, svcLogger)
if err != nil {
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed",
"Error updating load balancer status: %v", err)
"Error updating Service status and GCP resource annotations: %v", err)
syncResult.Error = err
return syncResult
}
if l4c.enableDualStack {
l4c.emitEnsuredDualStackEvent(service)
if err = updateL4DualStackResourcesAnnotations(l4c.ctx, service, syncResult.Annotations, svcLogger); err != nil {
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed",
"Failed to update Dual Stack annotations for load balancer, err: %v", err)
syncResult.Error = fmt.Errorf("failed to set Dual Stack resource annotations, err: %w", err)
return syncResult
}
} else {
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeNormal, "SyncLoadBalancerSuccessful",
"Successfully ensured load balancer resources")
if err = updateL4ResourcesAnnotations(l4c.ctx, service, syncResult.Annotations, svcLogger); err != nil {
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed",
"Failed to update annotations for load balancer, err: %v", err)
syncResult.Error = fmt.Errorf("failed to set resource annotations, err: %w", err)
return syncResult
}
}
return syncResult
}

func (l4c *L4Controller) emitEnsuredDualStackEvent(service *v1.Service) {
var ipFamilies []string
for _, ipFamily := range service.Spec.IPFamilies {
ipFamilies = append(ipFamilies, string(ipFamily))
}
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeNormal, "SyncLoadBalancerSuccessful",
"Successfully ensured %v load balancer resources", strings.Join(ipFamilies, " "))
}

func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service, svcLogger klog.Logger) *loadbalancers.L4ILBSyncResult {
startTime := time.Now()
svcLogger.Info("Deleting L4 ILB service")
Expand All @@ -359,28 +331,12 @@ func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service, svc
// Reset the loadbalancer status first, before resetting annotations.
// Other controllers(like service-controller) will process the service update if annotations change, but will ignore a service status change.
// Following this order avoids a race condition when a service is changed from LoadBalancer type Internal to External.
if err := updateServiceStatus(l4c.ctx, svc, &v1.LoadBalancerStatus{}, svcLogger); err != nil {
if err := updateServiceInformation(l4c.ctx, l4c.enableDualStack, svc, &v1.LoadBalancerStatus{}, nil, svcLogger); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer",
"Error resetting load balancer status to empty: %v", err)
result.Error = fmt.Errorf("failed to reset ILB status, err: %w", err)
"Error cleaning Service status and GCP resource annotations: %v", err)
result.Error = fmt.Errorf("failed to clean ILB statu and GCP resource annotations, err: %w", err)
return result
}
// Also remove any ILB annotations from the service metadata
if l4c.enableDualStack {
if err := updateL4DualStackResourcesAnnotations(l4c.ctx, svc, nil, svcLogger); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer",
"Error resetting DualStack resource annotations for load balancer: %v", err)
result.Error = fmt.Errorf("failed to reset DualStack resource annotations, err: %w", err)
return result
}
} else {
if err := updateL4ResourcesAnnotations(l4c.ctx, svc, nil, svcLogger); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer",
"Error resetting resource annotations for load balancer: %v", err)
result.Error = fmt.Errorf("failed to reset resource annotations, err: %w", err)
return result
}
}

if err := common.EnsureDeleteServiceFinalizer(svc, common.ILBFinalizerV2, l4c.ctx.KubeClient, svcLogger); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancerFailed",
Expand Down
1 change: 1 addition & 0 deletions pkg/l4lb/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ func TestProcessDualStackServiceOnUserError(t *testing.T) {
addILBService(l4c, newSvc)
addNEG(l4c, newSvc)
syncResult := l4c.processServiceCreateOrUpdate(newSvc, klog.TODO())

if syncResult.Error == nil {
t.Fatalf("Failed to generate error when syncing service %s", newSvc.Name)
}
Expand Down
64 changes: 32 additions & 32 deletions pkg/l4lb/l4lbcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package l4lb
import (
"fmt"
"reflect"
"strings"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
Expand All @@ -38,12 +39,15 @@ import (
// computeNewAnnotationsIfNeeded checks if new annotations should be added to service.
// If needed creates new service meta object.
// This function is used by External and Internal L4 LB controllers.
func computeNewAnnotationsIfNeeded(svc *v1.Service, newAnnotations map[string]string, keysToRemove []string) *metav1.ObjectMeta {
func computeNewAnnotationsIfNeeded(svc *v1.Service, newAnnotations map[string]string, enableDualStack bool) *metav1.ObjectMeta {
var keysToRemove []string
if enableDualStack {
keysToRemove = loadbalancers.L4DualStackResourceAnnotationKeys
} else {
keysToRemove = loadbalancers.L4ResourceAnnotationKeys
}
newObjectMeta := svc.ObjectMeta.DeepCopy()
newObjectMeta.Annotations = mergeAnnotations(newObjectMeta.Annotations, newAnnotations, keysToRemove)
if reflect.DeepEqual(svc.Annotations, newObjectMeta.Annotations) {
return nil
}
return newObjectMeta
}

Expand All @@ -66,46 +70,42 @@ func mergeAnnotations(existing, lbAnnotations map[string]string, keysToRemove []
return existing
}

// updateL4ResourcesAnnotations checks if new annotations should be added to service and patch service metadata if needed.
func updateL4ResourcesAnnotations(ctx *context.ControllerContext, svc *v1.Service, newL4LBAnnotations map[string]string, svcLogger klog.Logger) error {
svcLogger.V(3).Info("Updating annotations of service")
newObjectMeta := computeNewAnnotationsIfNeeded(svc, newL4LBAnnotations, loadbalancers.L4ResourceAnnotationKeys)
if newObjectMeta == nil {
svcLogger.V(3).Info("Service annotations not changed, skipping patch for service")
return nil
}
svcLogger.V(3).Info("Patching annotations of service")
return patch.PatchServiceObjectMetadata(ctx.KubeClient.CoreV1(), svc, *newObjectMeta)
}

// updateL4DualStackResourcesAnnotations checks if new annotations should be added to dual-stack service and patch service metadata if needed.
func updateL4DualStackResourcesAnnotations(ctx *context.ControllerContext, svc *v1.Service, newL4LBAnnotations map[string]string, svcLogger klog.Logger) error {
newObjectMeta := computeNewAnnotationsIfNeeded(svc, newL4LBAnnotations, loadbalancers.L4DualStackResourceAnnotationKeys)
if newObjectMeta == nil {
return nil
}
svcLogger.V(3).Info("Patching annotations of service")
return patch.PatchServiceObjectMetadata(ctx.KubeClient.CoreV1(), svc, *newObjectMeta)
}

func deleteAnnotation(ctx *context.ControllerContext, svc *v1.Service, annotationKey string, svcLogger klog.Logger) error {
newObjectMeta := svc.ObjectMeta.DeepCopy()
if _, ok := newObjectMeta.Annotations[annotationKey]; !ok {
return nil
}
svcLogger.V(3).Info("Removing annotation from service", "annotationKey", annotationKey)
delete(newObjectMeta.Annotations, annotationKey)
return patch.PatchServiceObjectMetadata(ctx.KubeClient.CoreV1(), svc, *newObjectMeta)
return patch.PatchServiceObjectMetadata(ctx.KubeClient.CoreV1(), svc, newObjectMeta)
}

// updateServiceStatus this faction checks if LoadBalancer status changed and patch service if needed.
func updateServiceStatus(ctx *context.ControllerContext, svc *v1.Service, newStatus *v1.LoadBalancerStatus, svcLogger klog.Logger) error {
svcLogger.V(2).Info("Updating service status", "newStatus", fmt.Sprintf("%+v", newStatus))
if helpers.LoadBalancerStatusEqual(&svc.Status.LoadBalancer, newStatus) {
svcLogger.V(2).Info("New and old statuses are equal, skipping patch")
func updateServiceInformation(ctx *context.ControllerContext, enableDualStack bool, svc *v1.Service, newStatus *v1.LoadBalancerStatus, newL4LBAnnotations map[string]string, svcLogger klog.Logger) error {
emitIpFamiliesStackEvent(ctx, svc)
newObjectMeta := computeNewAnnotationsIfNeeded(svc, newL4LBAnnotations, enableDualStack)

svcLogger.V(2).Info("Updating service information", "newStatus", fmt.Sprintf("%+v", newStatus), "newObjectMeta", newObjectMeta)
if helpers.LoadBalancerStatusEqual(&svc.Status.LoadBalancer, newStatus) && reflect.DeepEqual(svc.ObjectMeta.Annotations, newObjectMeta.Annotations) {
svcLogger.V(2).Info("New and old service Status and Annotations are equal, skipping patch")
return nil
}
return patch.PatchServiceLoadBalancerStatus(ctx.KubeClient.CoreV1(), svc, *newStatus)
return patch.PatchServiceLoadBalancerInformation(ctx.KubeClient.CoreV1(), svc, newStatus, newObjectMeta)
}

func emitIpFamiliesStackEvent(ctx *context.ControllerContext, service *v1.Service) {
var ipFamilies []string
for _, ipFamily := range service.Spec.IPFamilies {
ipFamilies = append(ipFamilies, string(ipFamily))
}
ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeNormal, "SyncLoadBalancerSuccessful",
"Successfully ensured %v load balancer resources", func() string {
if len(ipFamilies) == 0 {
return "L4"
}
return strings.Join(ipFamilies, " ")
}())

}

// isHealthCheckDeleted checks if given health check exists in GCE
Expand Down
139 changes: 139 additions & 0 deletions pkg/l4lb/l4lbcommon_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package l4lb

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
api_v1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/klog/v2"
)

func TestFinalizerWasRemovedUnexpectedly(t *testing.T) {
Expand Down Expand Up @@ -141,3 +148,135 @@ func TestFinalizerWasRemovedUnexpectedly(t *testing.T) {
})
}
}

type EmitEventMock struct {
eventtype string
reason string
message string
}

type MockRecorder struct {
Events *[]EmitEventMock
}

func (r MockRecorder) Event(object runtime.Object, eventtype, reason, message string) {
*r.Events = append(*r.Events, EmitEventMock{
eventtype,
reason,
message,
})
}

func (r MockRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
message := fmt.Sprintf(messageFmt, args...) // Use fmt.Sprintf to format the message
*r.Events = append(*r.Events, EmitEventMock{
eventtype,
reason,
message,
})
}

func (r MockRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
message := fmt.Sprintf(messageFmt, args...) // Format the message
*r.Events = append(*r.Events, EmitEventMock{
eventtype,
reason,
message,
})
}

func TestEmitIpFamiliesStackEvent(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
ipFamilies []api_v1.IPFamily
service *v1.Service
expectedEvent EmitEventMock
}{
{
name: "L4 Single Stack",
service: test.NewL4ILBService(false, 8080),
ipFamilies: []v1.IPFamily{},
expectedEvent: EmitEventMock{
eventtype: "Normal",
reason: "SyncLoadBalancerSuccessful",
message: "Successfully ensured L4 load balancer resources",
},
},
{
name: "L4 Single Stack IPv4",
service: test.NewL4ILBService(false, 8080),
ipFamilies: []v1.IPFamily{v1.IPv4Protocol},
expectedEvent: EmitEventMock{
eventtype: "Normal",
reason: "SyncLoadBalancerSuccessful",
message: "Successfully ensured IPv4 load balancer resources",
},
},
{
name: "L4 Dual Stack",
service: test.NewL4ILBDualStackService(8080, api_v1.ProtocolTCP, []api_v1.IPFamily{api_v1.IPv4Protocol, api_v1.IPv6Protocol}, api_v1.ServiceExternalTrafficPolicyTypeCluster),
ipFamilies: []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
expectedEvent: EmitEventMock{
eventtype: "Normal",
reason: "SyncLoadBalancerSuccessful",
message: "Successfully ensured IPv4 IPv6 load balancer resources",
},
},
{
name: "L4 Net Single Stack",
service: test.NewL4NetLBRBSService(8080),
ipFamilies: []v1.IPFamily{},
expectedEvent: EmitEventMock{
eventtype: "Normal",
reason: "SyncLoadBalancerSuccessful",
message: "Successfully ensured L4 load balancer resources",
},
},
{
name: "L4 Net Single Stack IPv4",
service: test.NewL4NetLBRBSService(8080),
ipFamilies: []v1.IPFamily{v1.IPv4Protocol},
expectedEvent: EmitEventMock{
eventtype: "Normal",
reason: "SyncLoadBalancerSuccessful",
message: "Successfully ensured IPv4 load balancer resources",
},
},
{
name: "L4 Net Dual Stack",
service: test.NewL4NetLBRBSService(8080),
ipFamilies: []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
expectedEvent: EmitEventMock{
eventtype: "Normal",
reason: "SyncLoadBalancerSuccessful",
message: "Successfully ensured IPv4 IPv6 load balancer resources",
},
},
}

for _, tc := range testCases {
tc := tc // Capture range variable for parallel tests

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

eventSlice := []EmitEventMock{}
recorder := MockRecorder{Events: &eventSlice}
l4c := newServiceController(t, newFakeGCE())

test.MustCreateDualStackClusterSubnet(t, l4c.ctx.Cloud, "INTERNAL")

l4c.ctx.Recorders[tc.service.Namespace] = recorder
tc.service.Spec.IPFamilies = tc.ipFamilies
addILBService(l4c, tc.service)
addNEG(l4c, tc.service)
l4c.processServiceCreateOrUpdate(tc.service, klog.TODO())

require.Len(t, *recorder.Events, 1)
assert.Equal(t, tc.expectedEvent, (*recorder.Events)[0])
})
}

}
Loading

0 comments on commit 72b74a5

Please sign in to comment.