diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index eaf9dbd560..672e7afbd2 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -616,6 +616,9 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S var errList []error shouldDeleteNegCR := true deleteByZone := len(svcNegCR.Status.NetworkEndpointGroups) == 0 + // Change this to a map from NEG name to sets once we allow multiple NEGs + // in a specific zone(multi-subnet cluster). + deletedNegs := make(map[negtypes.NegInfo]struct{}) for _, negRef := range svcNegCR.Status.NetworkEndpointGroups { resourceID, err := cloud.ParseResourceURL(negRef.SelfLink) @@ -624,14 +627,34 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S deleteByZone = true continue } - - shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList) + negDeleted := manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList) + if negDeleted { + deletedNegs[negtypes.NegInfo{Name: resourceID.Key.Name, Zone: resourceID.Key.Zone}] = struct{}{} + } + shouldDeleteNegCR = shouldDeleteNegCR && negDeleted } if deleteByZone { manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(svcNegCR), "svcNegCR", svcNegCR) for _, zone := range zones { - shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList) + negDeleted := manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList) + if negDeleted { + deletedNegs[negtypes.NegInfo{Name: svcNegCR.Name, Zone: zone}] = struct{}{} + } + shouldDeleteNegCR = shouldDeleteNegCR && negDeleted + } + } + // Since no more NEG deletion will be happening at this point, and NEG + // CR will not be deleted, clear the reference for deleted NEGs in the + // NEG CR. + if len(deletedNegs) != 0 { + updatedCR := svcNegCR.DeepCopy() + + if errs := ensureExistingNegRef(updatedCR, deletedNegs); len(errs) != 0 { + errList = append(errList, errs...) + } + if _, err := patchNegStatus(manager.svcNegClient, *svcNegCR, *updatedCR); err != nil { + errList = append(errList, err) } } @@ -655,9 +678,13 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S } manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(svcNegCR)) - if err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger); err != nil { + err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger) + if err != nil { + manager.logger.V(2).Error(err, "Error when deleting NEG CR", "svcneg", klog.KObj(svcNegCR)) errList = append(errList, err) + return } + manager.logger.V(2).Info("Deleted NEG CR", "svcneg", klog.KObj(svcNegCR)) }() return errList @@ -685,6 +712,25 @@ func (manager *syncerManager) deleteNegOrReportErr(name, zone string, svcNegCR * return true } +// ensureExistingNegRef removes NEG refs in NEG CR for NEGs that have been +// deleted successfully. +func ensureExistingNegRef(neg *negv1beta1.ServiceNetworkEndpointGroup, deletedNegs map[negtypes.NegInfo]struct{}) []error { + var updatedNegRef []negv1beta1.NegObjectReference + var errList []error + for _, negRef := range neg.Status.NetworkEndpointGroups { + negInfo, err := negtypes.NegInfoFromNegRef(negRef) + if err != nil { + errList = append(errList, err) + continue + } + if _, exists := deletedNegs[negInfo]; !exists { + updatedNegRef = append(updatedNegRef, negRef) + } + } + neg.Status.NetworkEndpointGroups = updatedNegRef + return errList +} + // ensureDeleteNetworkEndpointGroup ensures neg is delete from zone func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string, expectedDesc *utils.NegDescription) error { neg, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA, manager.logger) diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 18816d7e20..2a9c149a37 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -35,11 +35,13 @@ import ( apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" "k8s.io/ingress-gce/pkg/neg/metrics/metricscollector" @@ -47,6 +49,7 @@ import ( "k8s.io/ingress-gce/pkg/neg/types" negtypes "k8s.io/ingress-gce/pkg/neg/types" svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/utils/common" namer_util "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/ingress-gce/pkg/utils/zonegetter" @@ -1189,7 +1192,10 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { expectNegGC bool expectCrGC bool expectErr bool - gcError error + negGCError error + negGCErrorZone []string + negCrGCError error + expectedNegCount int // expectGenNamedNegGC indicates that the Neg GC only occurs if using a generated name // expectNegGC will take precedence over this value @@ -1250,6 +1256,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { expectNegGC: false, expectCrGC: true, negDesc: wrongDesc.String(), + expectedNegCount: 2, }, {desc: "neg config not in svcPortMap, empty neg list, neg has empty description", negsExist: true, @@ -1258,6 +1265,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { expectGenNamedNegGC: true, expectCrGC: true, negDesc: "", + expectedNegCount: 2, }, {desc: "neg config in svcPortMap, marked for deletion", negsExist: true, @@ -1265,6 +1273,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { desiredConfig: true, expectNegGC: false, expectCrGC: false, + expectedNegCount: 2, }, {desc: "neg config in svcPortMap", negsExist: true, @@ -1272,6 +1281,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { desiredConfig: true, expectNegGC: false, expectCrGC: false, + expectedNegCount: 2, }, {desc: "negs don't exist, config not in svcPortMap", negsExist: false, @@ -1288,16 +1298,38 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { markedForDeletion: true, expectCrGC: true, expectErr: false, - gcError: &googleapi.Error{Code: http.StatusBadRequest}, + negGCError: &googleapi.Error{Code: http.StatusBadRequest}, + negGCErrorZone: []string{negtypes.TestZone1, negtypes.TestZone2}, negDesc: matchingDesc.String(), }, - {desc: "error during neg gc, config not in svcPortMap", + {desc: "error on all NEG deletions during neg gc, config not in svcPortMap, NEG CR should still have all NEG ref", negsExist: true, markedForDeletion: true, expectCrGC: true, expectErr: true, - gcError: fmt.Errorf("gc-error"), + negGCError: fmt.Errorf("neg-gc-error"), + negGCErrorZone: []string{negtypes.TestZone1, negtypes.TestZone2}, negDesc: matchingDesc.String(), + expectedNegCount: 2, + }, + {desc: "error on one NEG deletion during neg gc, config not in svcPortMap, NEG CR should not have the deleted NEG ref", + negsExist: true, + markedForDeletion: true, + expectCrGC: true, + expectErr: true, + negGCError: fmt.Errorf("neg-gc-error"), + negGCErrorZone: []string{negtypes.TestZone1}, + negDesc: matchingDesc.String(), + expectedNegCount: 1, + }, + {desc: "error when deleting NEG CR during neg gc, config not in svcPortMap, NEG CR should not have any stale NEG ref", + negsExist: true, + markedForDeletion: false, // Make sure deletion timestamp is not set so we will trigger error when delete NEG CR + expectErr: true, + expectCrGC: false, // NEG CR deletion should fail due to error from deletion API call + negCrGCError: fmt.Errorf("neg-cr-gc-error"), + negDesc: matchingDesc.String(), + expectedNegCount: 0, }, } @@ -1310,6 +1342,13 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { manager, testCloud := NewTestSyncerManager(kubeClient) svcNegClient := manager.svcNegClient + if tc.negCrGCError != nil { + svcNegClientFake := svcNegClient.(*negfake.Clientset) + svcNegClientFake.PrependReactor("delete", "servicenetworkendpointgroups", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &negv1beta1.ServiceNetworkEndpointGroup{}, tc.negCrGCError + }) + } + manager.serviceLister.Add(svc) fakeNegCloud := manager.cloud @@ -1349,7 +1388,7 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { } if _, err := manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(cr.Namespace).Create(context2.TODO(), &cr, metav1.CreateOptions{}); err != nil { - t.Fatalf("failed to create neg cr") + t.Fatalf("failed to create neg cr: %v", err) } crs := getNegCRs(t, svcNegClient, testServiceNamespace) @@ -1370,19 +1409,20 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { } } - if tc.gcError != nil { + if tc.negGCError != nil { mockCloud := testCloud.Compute().(*cloud.MockGCE) mockNEG := mockCloud.NetworkEndpointGroups().(*cloud.MockNetworkEndpointGroups) - for _, zone := range []string{negtypes.TestZone1, negtypes.TestZone2} { - mockNEG.DeleteError[*meta.ZonalKey(negName, zone)] = tc.gcError + for _, zone := range tc.negGCErrorZone { + mockNEG.DeleteError[*meta.ZonalKey(negName, zone)] = tc.negGCError } } err := manager.GC() if !tc.expectErr && err != nil { t.Fatalf("failed to GC: %v", err) - } else if tc.expectErr && err == nil { + } + if tc.expectErr && err == nil { t.Errorf("expected GC to error") } @@ -1391,13 +1431,14 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { t.Errorf("failed getting negs from cloud: %s", err) } - numExistingNegs, negsDeleted := checkForNegDeletions(negs, negName) + numExistingNegs := checkForNegDeletions(negs, negName) expectNegGC := tc.expectNegGC || (tc.expectGenNamedNegGC && !customName) - if tc.negsExist && expectNegGC && !negsDeleted { + if tc.negsExist && expectNegGC && numExistingNegs != 0 { t.Errorf("expected negs to be GCed, but found %d", numExistingNegs) - } else if tc.negsExist && !expectNegGC && numExistingNegs != 2 { - t.Errorf("expected two negs in the cloud, but found %d", numExistingNegs) + } + if tc.negsExist && !expectNegGC && numExistingNegs != tc.expectedNegCount { + t.Errorf("expected %d negs in the cloud, but found %d", tc.expectedNegCount, numExistingNegs) } crs = getNegCRs(t, svcNegClient, testServiceNamespace) @@ -1405,7 +1446,8 @@ func TestGarbageCollectionNegCrdEnabled(t *testing.T) { if tc.expectCrGC && !crDeleted { t.Errorf("expected neg %s to be deleted", negName) - } else if !tc.expectCrGC && crDeleted && !tc.markedForDeletion { + } + if !tc.expectCrGC && crDeleted && !tc.markedForDeletion { t.Errorf("expected neg %s to not be deleted", negName) } } @@ -1559,8 +1601,8 @@ func getNegObjectRefs(t *testing.T, cloud negtypes.NetworkEndpointGroupCloud, zo return negRefs } -// checkForNegDeletions checks that negs does not have a neg with the provided negName. If none exists, returns true, otherwise returns false the number of negs found with the name -func checkForNegDeletions(negs map[*meta.Key]*composite.NetworkEndpointGroup, negName string) (int, bool) { +// checkForNegDeletions gets the count of neg objects in negs that has the provided negName. +func checkForNegDeletions(negs map[*meta.Key]*composite.NetworkEndpointGroup, negName string) int { foundNegs := 0 for _, neg := range negs { if neg.Name == negName { @@ -1568,7 +1610,7 @@ func checkForNegDeletions(negs map[*meta.Key]*composite.NetworkEndpointGroup, ne } } - return foundNegs, foundNegs == 0 + return foundNegs } // checkForNegCRDeletion verifies that either no cr with name `negName` exists or a cr withe name `negName` has its deletion timestamp set diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index fecb5ab534..a37d1efd5e 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -769,8 +769,12 @@ func (s *transactionSyncer) logEndpoints(endpointMap map[string]negtypes.Network s.logger.V(3).Info("Endpoints for NEG", "description", desc, "endpointMap", endpointMap) } -// updateInitStatus queries the k8s api server for the current NEG CR and updates the Initialized condition and neg objects as appropriate. -// If neg client is nil, will return immediately +// updateInitStatus takes in the NEG refs based on the existing node zones, +// then queries the k8s api server for the current NEG CR and updates the +// Initialized condition and neg objects as appropriate. +// Before patching the NEG CR, it also includes NEG refs for NEGs are no longer +// needed and change status as INACTIVE. +// If neg client is nil, will return immediately. func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectReference, errList []error) { if s.svcNegClient == nil { return @@ -784,6 +788,8 @@ func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectRe } neg := origNeg.DeepCopy() + inactiveNegObjRefs := getInactiveNegRefs(origNeg.Status.NetworkEndpointGroups, negObjRefs, s.logger) + negObjRefs = append(negObjRefs, inactiveNegObjRefs...) neg.Status.NetworkEndpointGroups = negObjRefs initializedCondition := getInitializedCondition(utilerrors.NewAggregate(errList)) @@ -917,6 +923,41 @@ func ensureCondition(neg *negv1beta1.ServiceNetworkEndpointGroup, expectedCondit return expectedCondition } +// getInactiveNegRefs creates NEG references for NEGs in Inactive State. +// Inactive NEG are NEGs that are no longer needed. +func getInactiveNegRefs(oldNegRefs []negv1beta1.NegObjectReference, currentNegRefs []negv1beta1.NegObjectReference, logger klog.Logger) []negv1beta1.NegObjectReference { + activeNegs := make(map[negtypes.NegInfo]struct{}) + for _, negRef := range currentNegRefs { + negInfo, err := negtypes.NegInfoFromNegRef(negRef) + if err != nil { + logger.Error(err, "Failed to extract name and zone information of a neg from the current snapshot", "negId", negRef.Id, "negSelfLink", negRef.SelfLink) + continue + } + activeNegs[negInfo] = struct{}{} + } + + var inactiveNegRefs []negv1beta1.NegObjectReference + for _, origNegRef := range oldNegRefs { + negInfo, err := negtypes.NegInfoFromNegRef(origNegRef) + if err != nil { + logger.Error(err, "Failed to extract name and zone information of a neg from the previous snapshot, skipping validating if it is an Inactive NEG", "negId", origNegRef.Id, "negSelfLink", origNegRef.SelfLink) + continue + } + + // NEGs are listed based on the current node zones. If a NEG no longer + // exists in the current list, it means there are no nodes/endpoints + // in that specific zone, and we mark it as INACTIVE. + // We use SelfLink as identifier since it contains the unique NEG zone + // and name pair. + if _, exists := activeNegs[negInfo]; !exists { + inactiveNegRef := origNegRef.DeepCopy() + inactiveNegRef.State = negv1beta1.InactiveState + inactiveNegRefs = append(inactiveNegRefs, *inactiveNegRef) + } + } + return inactiveNegRefs +} + // getSyncedCondition returns the expected synced condition based on given error func getSyncedCondition(err error) negv1beta1.Condition { if err != nil { diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index cd521cf17e..509a33fc1f 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1159,7 +1159,8 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { ServiceName: "service-2", Port: "80", }.String(), - crStatusPopulated: false, + // This indicate a different syncer is owning the CR, and has already populated NEG CR Status with valid content. + crStatusPopulated: true, expectErr: true, expectNoopOnNegStatus: true, }, @@ -1172,7 +1173,8 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { ServiceName: testServiceName, Port: "81", }.String(), - crStatusPopulated: false, + // This indicate a different syncer is owning the CR, and has already populated NEG CR Status with valid content. + crStatusPopulated: true, expectErr: true, expectNoopOnNegStatus: true, }, @@ -1200,6 +1202,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { expectZones := sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4) var expectedNegRefs map[string]negv1beta1.NegObjectReference + var err error if tc.negExists { for zone := range expectZones { fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ @@ -1211,8 +1214,10 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { Description: tc.negDesc, }, zone, klog.TODO()) } - ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) - expectedNegRefs = negObjectReferences(ret) + expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones) + if err != nil { + t.Errorf("Failed to get negObjRef from NEG CR: %v", err) + } } var refs []negv1beta1.NegObjectReference if tc.crStatusPopulated { @@ -1243,16 +1248,25 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { if err != nil { t.Errorf("Failed to get NEG from neg client: %s", err) } - ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) if !tc.expectErr { - expectedNegRefs = negObjectReferences(ret) + expectedNegRefs, err = negObjectReferences(fakeCloud, negv1beta1.ActiveState, expectZones) + if err != nil { + t.Errorf("Failed to get negObjRef from NEG CR: %v", err) + } } // if error occurs, expect that neg object references are not populated if tc.expectErr && !tc.crStatusPopulated { expectedNegRefs = nil } - checkNegCR(t, negCR, creationTS, expectZones, expectedNegRefs, false, tc.expectErr) + // NEG Object References should exist if: + // 1. ensureNetworkEndpointGroups() doesn't result in errors, which + // should populate the NEG Object Reference for NEGs that have + // been successfully ensured. + // 2. NEG CR is owned by a differ syncer, and the NEG object refs + // have been populated. + expectPopulatedNegRefs := !tc.expectErr || (tc.crStatusPopulated && tc.expectNoopOnNegStatus) + checkNegCR(t, negCR, creationTS, expectZones, nil, expectPopulatedNegRefs, false, fakeCloud) if tc.expectErr && tc.expectNoopOnNegStatus { // If CR is populated, we should have initialized and synced condition var expectedConditionLen int @@ -1270,6 +1284,9 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { if tc.expectErr && !tc.expectNoopOnNegStatus { checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionFalse, true) } + if tc.expectErr && tc.expectNoopOnNegStatus { + checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionTrue, false) + } if !tc.expectErr && tc.crStatusPopulated { checkCondition(t, negCR.Status.Conditions, negv1beta1.Initialized, creationTS, corev1.ConditionTrue, false) } @@ -1285,6 +1302,7 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { // Verify the NEGs are created as expected retZones := sets.NewString() + ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) for key, neg := range ret { retZones.Insert(key.Zone) if neg.Name != testNegName { @@ -1309,6 +1327,129 @@ func TestTransactionSyncerWithNegCR(t *testing.T) { } } +// TestUpdateInitStatus iterates over different zone transition situation, and +// check if NEG Object Reference in the corresponding zone has the expected State. +func TestUpdateInitStatus(t *testing.T) { + t.Parallel() + testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"}) + testSubnetwork := cloud.ResourcePath("subnetwork", &meta.Key{Name: "test-subnetwork"}) + testNegType := negtypes.VmIpPortEndpointType + + // Active zones: zone1, zone2. + // Inactive zones: zone3 + oldActiveZones := sets.NewString(negtypes.TestZone1, negtypes.TestZone2) + oldInactiveZones := sets.NewString(negtypes.TestZone3) + + testCases := []struct { + desc string + newActiveZones sets.String + newInactiveZones sets.String + }{ + { + desc: "Add a new zone zone4, an additional NEG ref should be added to NEG CR with ACTIVE status", + newActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone4), + newInactiveZones: sets.NewString(negtypes.TestZone3), + }, + { + desc: "Removed an ACTIVE zone zone2, corresponding NEG ref should still in NEG CR but with INACTIVE status", + newActiveZones: sets.NewString(negtypes.TestZone1), + newInactiveZones: sets.NewString(negtypes.TestZone2, negtypes.TestZone3), + }, + { + desc: "Add back an INACTIVE zone zone3, the NEG ref in this zone should become ACTIVE in NEG CR", + newActiveZones: sets.NewString(negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork) + _, syncer := newTestTransactionSyncer(fakeCloud, testNegType, false) + svcNegClient := syncer.svcNegClient + + // Create initial NEGs, and get their Object Ref to be used in NEG CR. + var initialNegRefs []negv1beta1.NegObjectReference + for zone := range oldActiveZones.Union(oldInactiveZones) { + err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: syncer.NegSyncerKey.GetAPIVersion(), + Name: testNegName, + NetworkEndpointType: string(syncer.NegSyncerKey.NegType), + Network: fakeCloud.NetworkURL(), + Subnetwork: fakeCloud.SubnetworkURL(), + }, zone, klog.TODO()) + if err != nil { + t.Fatalf("Failed to create NEG %s in zone %s: %v", testNegName, zone, err) + } + neg, err := fakeCloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO()) + if err != nil { + t.Fatalf("Failed to get NEG %s in zone %s: %v", testNegName, zone, err) + } + negRef := negv1beta1.NegObjectReference{ + Id: fmt.Sprint(neg.Id), + SelfLink: neg.SelfLink, + NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), + State: negv1beta1.ActiveState, + SubnetURL: neg.Subnetwork, + } + if oldInactiveZones.Has(zone) { + negRef.State = negv1beta1.InactiveState + } + initialNegRefs = append(initialNegRefs, negRef) + } + + // Create NEG CR. + creationTS := v1.Now() + origCR := createNegCR(testNegName, creationTS, true, true, initialNegRefs) + svcNeg, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Create(context.Background(), origCR, v1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create test NEG CR: %s", err) + } + syncer.svcNegLister.Add(svcNeg) + + // Create a NEG in a new zone if zone expanded. + addedZones := tc.newActiveZones.Difference(oldActiveZones.Union(oldInactiveZones)) + if addedZones != nil { + for zone := range addedZones { + err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ + Version: syncer.NegSyncerKey.GetAPIVersion(), + Name: testNegName, + NetworkEndpointType: string(syncer.NegSyncerKey.NegType), + Network: fakeCloud.NetworkURL(), + Subnetwork: fakeCloud.SubnetworkURL(), + }, zone, klog.TODO()) + if err != nil { + t.Fatalf("Failed to create NEG %s in zone %s: %v", testNegName, zone, err) + } + } + } + + // This is the input list to updateInitStatus(). + // It should only include NEG ref in the active zones. + var activeNegList []negv1beta1.NegObjectReference + for zone := range tc.newActiveZones { + neg, err := fakeCloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO()) + if err != nil { + t.Fatalf("Failed to get NEG %s in zone %s: %v", testNegName, zone, err) + } + negRef := getNegObjectReference(neg, negv1beta1.ActiveState) + activeNegList = append(activeNegList, negRef) + } + + // Inactive NEG refs should be added if there is any. + syncer.updateInitStatus(activeNegList, nil) + + negCR, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(testServiceNamespace).Get(context.Background(), testNegName, v1.GetOptions{}) + if err != nil { + t.Errorf("Failed to create test NEG CR: %s", err) + } + + checkNegCR(t, negCR, creationTS, tc.newActiveZones, tc.newInactiveZones, true, false, fakeCloud) + }) + } +} + func TestUpdateStatus(t *testing.T) { testNetwork := cloud.ResourcePath("network", &meta.Key{Name: "test-network"}) testSubnetwork := cloud.ResourcePath("subnetwork", &meta.Key{Name: "test-subnetwork"}) @@ -1502,8 +1643,11 @@ func TestIsZoneChange(t *testing.T) { Subnetwork: fakeCloud.SubnetworkURL(), }, zone, klog.TODO()) } - ret, _ := fakeCloud.AggregatedListNetworkEndpointGroup(syncer.NegSyncerKey.GetAPIVersion(), klog.TODO()) - negRefMap := negObjectReferences(ret) + negRefMap, err := negObjectReferences(fakeCloud, negv1beta1.ActiveState, sets.NewString(origZones...)) + if err != nil { + t.Errorf("Failed to get negObjRef from NEG CR: %v", err) + } + var refs []negv1beta1.NegObjectReference for _, neg := range negRefMap { refs = append(refs, neg) @@ -2460,17 +2604,28 @@ func waitForTransactions(syncer *transactionSyncer) error { } // negObjectReferences returns objectReferences for NEG CRs from NEG Objects -func negObjectReferences(negs map[*meta.Key]*composite.NetworkEndpointGroup) map[string]negv1beta1.NegObjectReference { - +func negObjectReferences(cloud negtypes.NetworkEndpointGroupCloud, state negv1beta1.NegState, zones sets.String) (map[string]negv1beta1.NegObjectReference, error) { negObjs := make(map[string]negv1beta1.NegObjectReference) - for _, neg := range negs { - negObjs[neg.SelfLink] = negv1beta1.NegObjectReference{ - Id: fmt.Sprint(neg.Id), - SelfLink: neg.SelfLink, - NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), + for zone := range zones { + neg, err := cloud.GetNetworkEndpointGroup(testNegName, zone, meta.VersionGA, klog.TODO()) + if err != nil { + return nil, err } + negRef := getNegObjectReference(neg, state) + negObjs[neg.SelfLink] = negRef + } + return negObjs, nil +} + +// getNegObjectReference returns objectReference for NEG CRs from NEG Object +func getNegObjectReference(neg *composite.NetworkEndpointGroup, negState negv1beta1.NegState) negv1beta1.NegObjectReference { + return negv1beta1.NegObjectReference{ + Id: fmt.Sprint(neg.Id), + SelfLink: neg.SelfLink, + NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), + State: negState, + SubnetURL: neg.Subnetwork, } - return negObjs } // checks the NEG Description on the cloud NEG Object and verifies with expected @@ -2569,13 +2724,32 @@ func createNegCR(testNegName string, creationTS metav1.Time, populateInitialized } // checkNegCR validates the NegObjectReferences and the LastSyncTime. It will not validate the conditions fields but ensures at most 2 conditions exist -func checkNegCR(t *testing.T, negCR *negv1beta1.ServiceNetworkEndpointGroup, previousLastSyncTime metav1.Time, expectZones sets.String, expectedNegRefs map[string]negv1beta1.NegObjectReference, expectSyncTimeUpdate, expectErr bool) { +func checkNegCR(t *testing.T, negCR *negv1beta1.ServiceNetworkEndpointGroup, previousLastSyncTime metav1.Time, activeZones, inactiveZones sets.String, expectPopulatedNegRefs, expectSyncTimeUpdate bool, cloud negtypes.NetworkEndpointGroupCloud) { if expectSyncTimeUpdate && !previousLastSyncTime.Before(&negCR.Status.LastSyncTime) { t.Errorf("Expected Neg CR to have an updated LastSyncTime") } else if !expectSyncTimeUpdate && !negCR.Status.LastSyncTime.IsZero() && !previousLastSyncTime.Equal(&negCR.Status.LastSyncTime) { t.Errorf("Expected Neg CR to not have an updated LastSyncTime") } + expectedNegRefs := make(map[string]negv1beta1.NegObjectReference) + + if expectPopulatedNegRefs { + ret, err := negObjectReferences(cloud, negv1beta1.ActiveState, activeZones) + if err != nil { + t.Fatalf("Failed to get negObjRef: %v", err) + } + for k, v := range ret { + expectedNegRefs[k] = v + } + ret, err = negObjectReferences(cloud, negv1beta1.InactiveState, inactiveZones) + if err != nil { + t.Fatalf("Failed to get negObjRef: %v", err) + } + for k, v := range ret { + expectedNegRefs[k] = v + } + } + var foundNegObjs []string if len(negCR.Status.NetworkEndpointGroups) != len(expectedNegRefs) { t.Errorf("Expected Neg CR to have %d corresponding neg object references, but has %d", len(expectedNegRefs), len(negCR.Status.NetworkEndpointGroups)) diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 50432ab0dd..d79115594e 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -227,6 +227,8 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService Id: fmt.Sprint(neg.Id), SelfLink: neg.SelfLink, NetworkEndpointType: negv1beta1.NetworkEndpointType(neg.NetworkEndpointType), + State: negv1beta1.ActiveState, + SubnetURL: neg.Subnetwork, } return negRef, nil } diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index c0b96add27..a505458a75 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -1337,11 +1337,16 @@ func TestNegObjectCrd(t *testing.T) { t.Errorf("Failed to find neg") } - var expectedNegObj negv1beta1.NegObjectReference - expectedNegObj = negv1beta1.NegObjectReference{ + var subnetURL string + if networkEndpointType != negtypes.NonGCPPrivateEndpointType { + subnetURL = testSubnetwork + } + expectedNegObj := negv1beta1.NegObjectReference{ Id: fmt.Sprint(neg.Id), SelfLink: neg.SelfLink, NetworkEndpointType: negv1beta1.NetworkEndpointType(networkEndpointType), + State: negv1beta1.ActiveState, + SubnetURL: subnetURL, } if negObj != expectedNegObj { diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index afbb92e39b..4d24d4c07c 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -21,6 +21,7 @@ import ( "reflect" "strconv" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" apiv1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -28,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-gce/pkg/annotations" + negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/network" "k8s.io/ingress-gce/pkg/utils/namer" @@ -382,3 +384,18 @@ func NodeFilterForNetworkEndpointType(negType NetworkEndpointType) zonegetter.Fi } return zonegetter.CandidateNodesFilter } + +// NegInfo holds the identifying information regarding a NEG. +type NegInfo struct { + Name string + Zone string +} + +// NegInfoFromNegRef returns NegInfo by parsing the NEG selflink. +func NegInfoFromNegRef(negRef negv1beta1.NegObjectReference) (NegInfo, error) { + resourceID, err := cloud.ParseResourceURL(negRef.SelfLink) + if err != nil { + return NegInfo{}, fmt.Errorf("failed to parse selflink: %v", err) + } + return NegInfo{Name: resourceID.Key.Name, Zone: resourceID.Key.Zone}, nil +}