Skip to content

Commit

Permalink
Merge pull request #2604 from sawsa307/update-neg-ref
Browse files Browse the repository at this point in the history
Update NEG reference in NEG CR
  • Loading branch information
k8s-ci-robot authored Sep 18, 2024
2 parents 3b8e5fc + 581ba4d commit 83eabdf
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 43 deletions.
54 changes: 50 additions & 4 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,9 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
var errList []error
shouldDeleteNegCR := true
deleteByZone := len(svcNegCR.Status.NetworkEndpointGroups) == 0
// Change this to a map from NEG name to sets once we allow multiple NEGs
// in a specific zone(multi-subnet cluster).
deletedNegs := make(map[negtypes.NegInfo]struct{})

for _, negRef := range svcNegCR.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
Expand All @@ -624,14 +627,34 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
deleteByZone = true
continue
}

shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList)
negDeleted := manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList)
if negDeleted {
deletedNegs[negtypes.NegInfo{Name: resourceID.Key.Name, Zone: resourceID.Key.Zone}] = struct{}{}
}
shouldDeleteNegCR = shouldDeleteNegCR && negDeleted
}

if deleteByZone {
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(svcNegCR), "svcNegCR", svcNegCR)
for _, zone := range zones {
shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList)
negDeleted := manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList)
if negDeleted {
deletedNegs[negtypes.NegInfo{Name: svcNegCR.Name, Zone: zone}] = struct{}{}
}
shouldDeleteNegCR = shouldDeleteNegCR && negDeleted
}
}
// Since no more NEG deletion will be happening at this point, and NEG
// CR will not be deleted, clear the reference for deleted NEGs in the
// NEG CR.
if len(deletedNegs) != 0 {
updatedCR := svcNegCR.DeepCopy()

if errs := ensureExistingNegRef(updatedCR, deletedNegs); len(errs) != 0 {
errList = append(errList, errs...)
}
if _, err := patchNegStatus(manager.svcNegClient, *svcNegCR, *updatedCR); err != nil {
errList = append(errList, err)
}
}

Expand All @@ -655,9 +678,13 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
}

manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(svcNegCR))
if err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger); err != nil {
err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger)
if err != nil {
manager.logger.V(2).Error(err, "Error when deleting NEG CR", "svcneg", klog.KObj(svcNegCR))
errList = append(errList, err)
return
}
manager.logger.V(2).Info("Deleted NEG CR", "svcneg", klog.KObj(svcNegCR))
}()

return errList
Expand Down Expand Up @@ -685,6 +712,25 @@ func (manager *syncerManager) deleteNegOrReportErr(name, zone string, svcNegCR *
return true
}

// ensureExistingNegRef removes NEG refs in NEG CR for NEGs that have been
// deleted successfully.
func ensureExistingNegRef(neg *negv1beta1.ServiceNetworkEndpointGroup, deletedNegs map[negtypes.NegInfo]struct{}) []error {
var updatedNegRef []negv1beta1.NegObjectReference
var errList []error
for _, negRef := range neg.Status.NetworkEndpointGroups {
negInfo, err := negtypes.NegInfoFromNegRef(negRef)
if err != nil {
errList = append(errList, err)
continue
}
if _, exists := deletedNegs[negInfo]; !exists {
updatedNegRef = append(updatedNegRef, negRef)
}
}
neg.Status.NetworkEndpointGroups = updatedNegRef
return errList
}

// ensureDeleteNetworkEndpointGroup ensures neg is delete from zone
func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string, expectedDesc *utils.NegDescription) error {
neg, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA, manager.logger)
Expand Down
76 changes: 59 additions & 17 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@ import (
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/neg/metrics/metricscollector"
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
"k8s.io/ingress-gce/pkg/neg/types"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/utils/common"
namer_util "k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/zonegetter"
Expand Down Expand Up @@ -1189,7 +1192,10 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
expectNegGC bool
expectCrGC bool
expectErr bool
gcError error
negGCError error
negGCErrorZone []string
negCrGCError error
expectedNegCount int

// expectGenNamedNegGC indicates that the Neg GC only occurs if using a generated name
// expectNegGC will take precedence over this value
Expand Down Expand Up @@ -1250,6 +1256,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
expectNegGC: false,
expectCrGC: true,
negDesc: wrongDesc.String(),
expectedNegCount: 2,
},
{desc: "neg config not in svcPortMap, empty neg list, neg has empty description",
negsExist: true,
Expand All @@ -1258,20 +1265,23 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
expectGenNamedNegGC: true,
expectCrGC: true,
negDesc: "",
expectedNegCount: 2,
},
{desc: "neg config in svcPortMap, marked for deletion",
negsExist: true,
markedForDeletion: true,
desiredConfig: true,
expectNegGC: false,
expectCrGC: false,
expectedNegCount: 2,
},
{desc: "neg config in svcPortMap",
negsExist: true,
markedForDeletion: false,
desiredConfig: true,
expectNegGC: false,
expectCrGC: false,
expectedNegCount: 2,
},
{desc: "negs don't exist, config not in svcPortMap",
negsExist: false,
Expand All @@ -1288,16 +1298,38 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
markedForDeletion: true,
expectCrGC: true,
expectErr: false,
gcError: &googleapi.Error{Code: http.StatusBadRequest},
negGCError: &googleapi.Error{Code: http.StatusBadRequest},
negGCErrorZone: []string{negtypes.TestZone1, negtypes.TestZone2},
negDesc: matchingDesc.String(),
},
{desc: "error during neg gc, config not in svcPortMap",
{desc: "error on all NEG deletions during neg gc, config not in svcPortMap, NEG CR should still have all NEG ref",
negsExist: true,
markedForDeletion: true,
expectCrGC: true,
expectErr: true,
gcError: fmt.Errorf("gc-error"),
negGCError: fmt.Errorf("neg-gc-error"),
negGCErrorZone: []string{negtypes.TestZone1, negtypes.TestZone2},
negDesc: matchingDesc.String(),
expectedNegCount: 2,
},
{desc: "error on one NEG deletion during neg gc, config not in svcPortMap, NEG CR should not have the deleted NEG ref",
negsExist: true,
markedForDeletion: true,
expectCrGC: true,
expectErr: true,
negGCError: fmt.Errorf("neg-gc-error"),
negGCErrorZone: []string{negtypes.TestZone1},
negDesc: matchingDesc.String(),
expectedNegCount: 1,
},
{desc: "error when deleting NEG CR during neg gc, config not in svcPortMap, NEG CR should not have any stale NEG ref",
negsExist: true,
markedForDeletion: false, // Make sure deletion timestamp is not set so we will trigger error when delete NEG CR
expectErr: true,
expectCrGC: false, // NEG CR deletion should fail due to error from deletion API call
negCrGCError: fmt.Errorf("neg-cr-gc-error"),
negDesc: matchingDesc.String(),
expectedNegCount: 0,
},
}

Expand All @@ -1310,6 +1342,13 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
manager, testCloud := NewTestSyncerManager(kubeClient)
svcNegClient := manager.svcNegClient

if tc.negCrGCError != nil {
svcNegClientFake := svcNegClient.(*negfake.Clientset)
svcNegClientFake.PrependReactor("delete", "servicenetworkendpointgroups", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &negv1beta1.ServiceNetworkEndpointGroup{}, tc.negCrGCError
})
}

manager.serviceLister.Add(svc)
fakeNegCloud := manager.cloud

Expand Down Expand Up @@ -1349,7 +1388,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
}

if _, err := manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(cr.Namespace).Create(context2.TODO(), &cr, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create neg cr")
t.Fatalf("failed to create neg cr: %v", err)
}

crs := getNegCRs(t, svcNegClient, testServiceNamespace)
Expand All @@ -1370,19 +1409,20 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
}
}

if tc.gcError != nil {
if tc.negGCError != nil {
mockCloud := testCloud.Compute().(*cloud.MockGCE)
mockNEG := mockCloud.NetworkEndpointGroups().(*cloud.MockNetworkEndpointGroups)

for _, zone := range []string{negtypes.TestZone1, negtypes.TestZone2} {
mockNEG.DeleteError[*meta.ZonalKey(negName, zone)] = tc.gcError
for _, zone := range tc.negGCErrorZone {
mockNEG.DeleteError[*meta.ZonalKey(negName, zone)] = tc.negGCError
}
}

err := manager.GC()
if !tc.expectErr && err != nil {
t.Fatalf("failed to GC: %v", err)
} else if tc.expectErr && err == nil {
}
if tc.expectErr && err == nil {
t.Errorf("expected GC to error")
}

Expand All @@ -1391,21 +1431,23 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
t.Errorf("failed getting negs from cloud: %s", err)
}

numExistingNegs, negsDeleted := checkForNegDeletions(negs, negName)
numExistingNegs := checkForNegDeletions(negs, negName)

expectNegGC := tc.expectNegGC || (tc.expectGenNamedNegGC && !customName)
if tc.negsExist && expectNegGC && !negsDeleted {
if tc.negsExist && expectNegGC && numExistingNegs != 0 {
t.Errorf("expected negs to be GCed, but found %d", numExistingNegs)
} else if tc.negsExist && !expectNegGC && numExistingNegs != 2 {
t.Errorf("expected two negs in the cloud, but found %d", numExistingNegs)
}
if tc.negsExist && !expectNegGC && numExistingNegs != tc.expectedNegCount {
t.Errorf("expected %d negs in the cloud, but found %d", tc.expectedNegCount, numExistingNegs)
}

crs = getNegCRs(t, svcNegClient, testServiceNamespace)
crDeleted := checkForNegCRDeletion(crs, negName)

if tc.expectCrGC && !crDeleted {
t.Errorf("expected neg %s to be deleted", negName)
} else if !tc.expectCrGC && crDeleted && !tc.markedForDeletion {
}
if !tc.expectCrGC && crDeleted && !tc.markedForDeletion {
t.Errorf("expected neg %s to not be deleted", negName)
}
}
Expand Down Expand Up @@ -1559,16 +1601,16 @@ func getNegObjectRefs(t *testing.T, cloud negtypes.NetworkEndpointGroupCloud, zo
return negRefs
}

// checkForNegDeletions checks that negs does not have a neg with the provided negName. If none exists, returns true, otherwise returns false the number of negs found with the name
func checkForNegDeletions(negs map[*meta.Key]*composite.NetworkEndpointGroup, negName string) (int, bool) {
// checkForNegDeletions gets the count of neg objects in negs that has the provided negName.
func checkForNegDeletions(negs map[*meta.Key]*composite.NetworkEndpointGroup, negName string) int {
foundNegs := 0
for _, neg := range negs {
if neg.Name == negName {
foundNegs += 1
}
}

return foundNegs, foundNegs == 0
return foundNegs
}

// checkForNegCRDeletion verifies that either no cr with name `negName` exists or a cr withe name `negName` has its deletion timestamp set
Expand Down
45 changes: 43 additions & 2 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,12 @@ func (s *transactionSyncer) logEndpoints(endpointMap map[string]negtypes.Network
s.logger.V(3).Info("Endpoints for NEG", "description", desc, "endpointMap", endpointMap)
}

// updateInitStatus queries the k8s api server for the current NEG CR and updates the Initialized condition and neg objects as appropriate.
// If neg client is nil, will return immediately
// updateInitStatus takes in the NEG refs based on the existing node zones,
// then queries the k8s api server for the current NEG CR and updates the
// Initialized condition and neg objects as appropriate.
// Before patching the NEG CR, it also includes NEG refs for NEGs are no longer
// needed and change status as INACTIVE.
// If neg client is nil, will return immediately.
func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectReference, errList []error) {
if s.svcNegClient == nil {
return
Expand All @@ -784,6 +788,8 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe
}

neg := origNeg.DeepCopy()
inactiveNegObjRefs := getInactiveNegRefs(origNeg.Status.NetworkEndpointGroups, negObjRefs, s.logger)
negObjRefs = append(negObjRefs, inactiveNegObjRefs...)
neg.Status.NetworkEndpointGroups = negObjRefs

initializedCondition := getInitializedCondition(utilerrors.NewAggregate(errList))
Expand Down Expand Up @@ -917,6 +923,41 @@ func ensureCondition(neg *negv1beta1.ServiceNetworkEndpointGroup, expectedCondit
return expectedCondition
}

// getInactiveNegRefs creates NEG references for NEGs in Inactive State.
// Inactive NEG are NEGs that are no longer needed.
func getInactiveNegRefs(oldNegRefs []negv1beta1.NegObjectReference, currentNegRefs []negv1beta1.NegObjectReference, logger klog.Logger) []negv1beta1.NegObjectReference {
activeNegs := make(map[negtypes.NegInfo]struct{})
for _, negRef := range currentNegRefs {
negInfo, err := negtypes.NegInfoFromNegRef(negRef)
if err != nil {
logger.Error(err, "Failed to extract name and zone information of a neg from the current snapshot", "negId", negRef.Id, "negSelfLink", negRef.SelfLink)
continue
}
activeNegs[negInfo] = struct{}{}
}

var inactiveNegRefs []negv1beta1.NegObjectReference
for _, origNegRef := range oldNegRefs {
negInfo, err := negtypes.NegInfoFromNegRef(origNegRef)
if err != nil {
logger.Error(err, "Failed to extract name and zone information of a neg from the previous snapshot, skipping validating if it is an Inactive NEG", "negId", origNegRef.Id, "negSelfLink", origNegRef.SelfLink)
continue
}

// NEGs are listed based on the current node zones. If a NEG no longer
// exists in the current list, it means there are no nodes/endpoints
// in that specific zone, and we mark it as INACTIVE.
// We use SelfLink as identifier since it contains the unique NEG zone
// and name pair.
if _, exists := activeNegs[negInfo]; !exists {
inactiveNegRef := origNegRef.DeepCopy()
inactiveNegRef.State = negv1beta1.InactiveState
inactiveNegRefs = append(inactiveNegRefs, *inactiveNegRef)
}
}
return inactiveNegRefs
}

// getSyncedCondition returns the expected synced condition based on given error
func getSyncedCondition(err error) negv1beta1.Condition {
if err != nil {
Expand Down
Loading

0 comments on commit 83eabdf

Please sign in to comment.