From ac55cad7b922677a0b41c2d26cdb82071b6367fa Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Fri, 20 Jun 2025 11:10:30 +0200 Subject: [PATCH 1/8] Export GetReplicaSets --- controllers/om/automation_config_test.go | 2 +- controllers/om/deployment.go | 16 +++++----- controllers/om/deployment_test.go | 40 ++++++++++++------------ controllers/om/depshardedcluster_test.go | 14 ++++----- controllers/om/mockedomclient.go | 2 +- 5 files changed, 37 insertions(+), 37 deletions(-) diff --git a/controllers/om/automation_config_test.go b/controllers/om/automation_config_test.go index 69300956e..fdc0d8e18 100644 --- a/controllers/om/automation_config_test.go +++ b/controllers/om/automation_config_test.go @@ -1059,7 +1059,7 @@ func TestApplyInto(t *testing.T) { } func changeTypes(deployment Deployment) error { - rs := deployment.getReplicaSets() + rs := deployment.GetReplicaSets() deployment.setReplicaSets(rs) return nil } diff --git a/controllers/om/deployment.go b/controllers/om/deployment.go index 254816648..ad51580c1 100644 --- a/controllers/om/deployment.go +++ b/controllers/om/deployment.go @@ -280,13 +280,13 @@ func (d Deployment) AddMonitoringAndBackup(log *zap.SugaredLogger, tls bool, caF d.addBackup(log) } -// DEPRECATED: this shouldn't be used as it may panic because of different underlying type; use getReplicaSets instead +// DEPRECATED: this shouldn't be used as it may panic because of different underlying type; use GetReplicaSets instead func (d Deployment) ReplicaSets() []ReplicaSet { return d["replicaSets"].([]ReplicaSet) } func (d Deployment) GetReplicaSetByName(name string) ReplicaSet { - for _, rs := range d.getReplicaSets() { + for _, rs := range d.GetReplicaSets() { if rs.Name() == name { return rs } @@ -395,7 +395,7 @@ func (d Deployment) RemoveReplicaSetByName(name string, log *zap.SugaredLogger) return xerrors.New("ReplicaSet does not exist") } - currentRs := d.getReplicaSets() + currentRs := d.GetReplicaSets() toKeep := make([]ReplicaSet, len(currentRs)-1) i := 0 for _, el := range currentRs { @@ -685,7 +685,7 @@ func (d Deployment) ProcessesCopy() []Process { // ReplicaSetsCopy returns the COPY of replicasets in the deployment. func (d Deployment) ReplicaSetsCopy() []ReplicaSet { - return d.deepCopy().getReplicaSets() + return d.deepCopy().GetReplicaSets() } // ShardedClustersCopy returns the COPY of sharded clusters in the deployment. @@ -958,7 +958,7 @@ func (d Deployment) getProcessByName(name string) *Process { } func (d Deployment) getReplicaSetByName(name string) *ReplicaSet { - for _, r := range d.getReplicaSets() { + for _, r := range d.GetReplicaSets() { if r.Name() == name { return &r } @@ -977,7 +977,7 @@ func (d Deployment) getShardedClusterByName(name string) *ShardedCluster { return nil } -func (d Deployment) getReplicaSets() []ReplicaSet { +func (d Deployment) GetReplicaSets() []ReplicaSet { switch v := d["replicaSets"].(type) { case []ReplicaSet: return v @@ -997,7 +997,7 @@ func (d Deployment) setReplicaSets(replicaSets []ReplicaSet) { } func (d Deployment) addReplicaSet(rs ReplicaSet) { - d.setReplicaSets(append(d.getReplicaSets(), rs)) + d.setReplicaSets(append(d.GetReplicaSets(), rs)) } func (d Deployment) getShardedClusters() []ShardedCluster { @@ -1052,7 +1052,7 @@ func (d Deployment) findReplicaSetsRemovedFromShardedCluster(clusterName string) clusterReplicaSets := shardedCluster.getAllReplicaSets() var ans []string - for _, v := range d.getReplicaSets() { + for _, v := range d.GetReplicaSets() { if !stringutil.Contains(clusterReplicaSets, v.Name()) && isShardOfShardedCluster(clusterName, v.Name()) { ans = append(ans, v.Name()) } diff --git a/controllers/om/deployment_test.go b/controllers/om/deployment_test.go index a222cadd6..8f0808dae 100644 --- a/controllers/om/deployment_test.go +++ b/controllers/om/deployment_test.go @@ -56,9 +56,9 @@ func TestMergeReplicaSet(t *testing.T) { expectedRs := buildRsByProcesses("fooRs", createReplicaSetProcesses("fooRs")) assert.Len(t, d.getProcesses(), 3) - assert.Len(t, d.getReplicaSets(), 1) - assert.Len(t, d.getReplicaSets()[0].Members(), 3) - assert.Equal(t, d.getReplicaSets()[0], expectedRs.Rs) + assert.Len(t, d.GetReplicaSets(), 1) + assert.Len(t, d.GetReplicaSets()[0].Members(), 3) + assert.Equal(t, d.GetReplicaSets()[0], expectedRs.Rs) // Now the deployment "gets updated" from external - new node is added and one is removed - this should be fixed // by merge @@ -66,15 +66,15 @@ func TestMergeReplicaSet(t *testing.T) { d.getProcesses()[0]["processType"] = ProcessTypeMongos // this will be overriden d.getProcesses()[1].EnsureNetConfig()["MaxIncomingConnections"] = 20 // this will be left as-is - d.getReplicaSets()[0]["protocolVersion"] = 10 // this field will be overriden by Operator - d.getReplicaSets()[0].setMembers(d.getReplicaSets()[0].Members()[0:2]) // "removing" the last node in replicaset - d.getReplicaSets()[0].addMember(newProcess, "", automationconfig.MemberOptions{}) // "adding" some new node - d.getReplicaSets()[0].Members()[0]["arbiterOnly"] = true // changing data for first node + d.GetReplicaSets()[0]["protocolVersion"] = 10 // this field will be overriden by Operator + d.GetReplicaSets()[0].setMembers(d.GetReplicaSets()[0].Members()[0:2]) // "removing" the last node in replicaset + d.GetReplicaSets()[0].addMember(newProcess, "", automationconfig.MemberOptions{}) // "adding" some new node + d.GetReplicaSets()[0].Members()[0]["arbiterOnly"] = true // changing data for first node mergeReplicaSet(d, "fooRs", createReplicaSetProcesses("fooRs")) assert.Len(t, d.getProcesses(), 3) - assert.Len(t, d.getReplicaSets(), 1) + assert.Len(t, d.GetReplicaSets(), 1) expectedRs = buildRsByProcesses("fooRs", createReplicaSetProcesses("fooRs")) expectedRs.Rs.Members()[0]["arbiterOnly"] = true @@ -89,14 +89,14 @@ func TestMergeReplica_ScaleDown(t *testing.T) { mergeReplicaSet(d, "someRs", createReplicaSetProcesses("someRs")) assert.Len(t, d.getProcesses(), 3) - assert.Len(t, d.getReplicaSets()[0].Members(), 3) + assert.Len(t, d.GetReplicaSets()[0].Members(), 3) // "scale down" scaledDownRsProcesses := createReplicaSetProcesses("someRs")[0:2] mergeReplicaSet(d, "someRs", scaledDownRsProcesses) assert.Len(t, d.getProcesses(), 2) - assert.Len(t, d.getReplicaSets()[0].Members(), 2) + assert.Len(t, d.GetReplicaSets()[0].Members(), 2) // checking that the last member was removed rsProcesses := buildRsByProcesses("someRs", createReplicaSetProcesses("someRs")).Processes @@ -123,7 +123,7 @@ func TestMergeReplicaSet_MergeFirstProcess(t *testing.T) { mergeReplicaSet(d, "fooRs", createReplicaSetProcessesCount(5, "fooRs")) assert.Len(t, d.getProcesses(), 8) - assert.Len(t, d.getReplicaSets(), 2) + assert.Len(t, d.GetReplicaSets(), 2) expectedRs := buildRsByProcesses("fooRs", createReplicaSetProcessesCount(5, "fooRs")) @@ -177,8 +177,8 @@ func TestMergeDeployment_BigReplicaset(t *testing.T) { checkNumberOfVotingMembers(t, rs, 7, 8) // Now OM user "has changed" votes for some of the members - this must stay the same after merge - omDeployment.getReplicaSets()[0].Members()[2].setVotes(0).setPriority(0) - omDeployment.getReplicaSets()[0].Members()[4].setVotes(0).setPriority(0) + omDeployment.GetReplicaSets()[0].Members()[2].setVotes(0).setPriority(0) + omDeployment.GetReplicaSets()[0].Members()[4].setVotes(0).setPriority(0) omDeployment.MergeReplicaSet(rs, nil, nil, zap.S()) checkNumberOfVotingMembers(t, rs, 5, 8) @@ -199,10 +199,10 @@ func TestMergeDeployment_BigReplicaset(t *testing.T) { omDeployment.MergeReplicaSet(rsToMerge, nil, nil, zap.S()) checkNumberOfVotingMembers(t, rs, 7, 11) - assert.Equal(t, 0, omDeployment.getReplicaSets()[0].Members()[2].Votes()) - assert.Equal(t, 0, omDeployment.getReplicaSets()[0].Members()[4].Votes()) - assert.Equal(t, float32(0), omDeployment.getReplicaSets()[0].Members()[2].Priority()) - assert.Equal(t, float32(0), omDeployment.getReplicaSets()[0].Members()[4].Priority()) + assert.Equal(t, 0, omDeployment.GetReplicaSets()[0].Members()[2].Votes()) + assert.Equal(t, 0, omDeployment.GetReplicaSets()[0].Members()[4].Votes()) + assert.Equal(t, float32(0), omDeployment.GetReplicaSets()[0].Members()[2].Priority()) + assert.Equal(t, float32(0), omDeployment.GetReplicaSets()[0].Members()[4].Priority()) } func TestGetAllProcessNames_MergedReplicaSetsAndShardedClusters(t *testing.T) { @@ -360,7 +360,7 @@ func TestGetNumberOfExcessProcesses_ShardedClusterScaleDown(t *testing.T) { _, err := d.MergeShardedCluster(mergeOpts) assert.NoError(t, err) assert.Len(t, d.getShardedClusterByName("sc001").shards(), 3) - assert.Len(t, d.getReplicaSets(), 4) + assert.Len(t, d.GetReplicaSets(), 4) assert.Equal(t, 0, d.GetNumberOfExcessProcesses("sc001")) // Now we are "scaling down" the sharded cluster - so junk replica sets will appear - this is still ok @@ -377,7 +377,7 @@ func TestGetNumberOfExcessProcesses_ShardedClusterScaleDown(t *testing.T) { _, err = d.MergeShardedCluster(mergeOpts) assert.NoError(t, err) assert.Len(t, d.getShardedClusterByName("sc001").shards(), 2) - assert.Len(t, d.getReplicaSets(), 4) + assert.Len(t, d.GetReplicaSets(), 4) assert.Equal(t, 0, d.GetNumberOfExcessProcesses("sc001")) } @@ -586,7 +586,7 @@ func checkShardedClusterCheckExtraReplicaSets(t *testing.T, d Deployment, expect // checking that no previous replica sets are left. For this we take the name of first shard and remove the last digit firstShardName := expectedReplicaSets[0].Rs.Name() i := 0 - for _, r := range d.getReplicaSets() { + for _, r := range d.GetReplicaSets() { if strings.HasPrefix(r.Name(), firstShardName[0:len(firstShardName)-1]) { i++ } diff --git a/controllers/om/depshardedcluster_test.go b/controllers/om/depshardedcluster_test.go index e93a90412..e9f025dd0 100644 --- a/controllers/om/depshardedcluster_test.go +++ b/controllers/om/depshardedcluster_test.go @@ -31,9 +31,9 @@ func TestMergeShardedCluster_New(t *testing.T) { assert.NoError(t, err) require.Len(t, d.getProcesses(), 15) - require.Len(t, d.getReplicaSets(), 4) + require.Len(t, d.GetReplicaSets(), 4) for i := 0; i < 4; i++ { - require.Len(t, d.getReplicaSets()[i].Members(), 3) + require.Len(t, d.GetReplicaSets()[i].Members(), 3) } checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster")) checkReplicaSet(t, d, createConfigSrvRs("configSrv", true)) @@ -130,9 +130,9 @@ func TestMergeShardedCluster_ReplicaSetsModified(t *testing.T) { expectedShards[0].Rs["writeConcernMajorityJournalDefault"] = true require.Len(t, d.getProcesses(), 15) - require.Len(t, d.getReplicaSets(), 4) + require.Len(t, d.GetReplicaSets(), 4) for i := 0; i < 4; i++ { - require.Len(t, d.getReplicaSets()[i].Members(), 3) + require.Len(t, d.GetReplicaSets()[i].Members(), 3) } checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster")) checkReplicaSet(t, d, createConfigSrvRs("configSrv", true)) @@ -166,7 +166,7 @@ func TestMergeShardedCluster_ShardedClusterModified(t *testing.T) { mergeReplicaSet(d, "fakeShard", createReplicaSetProcesses("fakeShard")) - require.Len(t, d.getReplicaSets(), 5) + require.Len(t, d.GetReplicaSets(), 5) // Final check - we create the expected configuration, add there correct OM changes and check for equality with merge // result @@ -188,9 +188,9 @@ func TestMergeShardedCluster_ShardedClusterModified(t *testing.T) { // Note, that fake replicaset and it's processes haven't disappeared as we passed 'false' to 'MergeShardedCluster' // which results in "draining" for redundant shards but not physical removal of replica sets require.Len(t, d.getProcesses(), 18) - require.Len(t, d.getReplicaSets(), 5) + require.Len(t, d.GetReplicaSets(), 5) for i := 0; i < 4; i++ { - require.Len(t, d.getReplicaSets()[i].Members(), 3) + require.Len(t, d.GetReplicaSets()[i].Members(), 3) } checkMongoSProcesses(t, d.getProcesses(), createMongosProcesses(3, "pretty", "cluster")) checkReplicaSet(t, d, createConfigSrvRs("configSrv", true)) diff --git a/controllers/om/mockedomclient.go b/controllers/om/mockedomclient.go index d5b90eef9..51c3f844d 100644 --- a/controllers/om/mockedomclient.go +++ b/controllers/om/mockedomclient.go @@ -734,7 +734,7 @@ func (oc *MockedOmConnection) CheckResourcesAndBackupDeleted(t *testing.T, resou // This can be improved for some more complicated scenarios when we have different resources in parallel - so far // just checking if deployment assert.Empty(t, oc.deployment.getProcesses()) - assert.Empty(t, oc.deployment.getReplicaSets()) + assert.Empty(t, oc.deployment.GetReplicaSets()) assert.Empty(t, oc.deployment.getShardedClusters()) assert.Empty(t, oc.deployment.getMonitoringVersions()) assert.Empty(t, oc.deployment.getBackupVersions()) From 8ae690cf09e689415f7e2cd41b3be8e5b36b4579 Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Mon, 30 Jun 2025 17:48:26 +0200 Subject: [PATCH 2/8] Save rs member ids in annotation --- controllers/om/mockedomclient.go | 16 ++++++++++ controllers/om/omclient.go | 19 +++++++++++ controllers/om/replicaset.go | 8 +++++ .../mongodbmultireplicaset_controller.go | 32 +++++++++++++++++-- pkg/util/constants.go | 5 ++- 5 files changed, 77 insertions(+), 3 deletions(-) diff --git a/controllers/om/mockedomclient.go b/controllers/om/mockedomclient.go index 51c3f844d..60978fc4e 100644 --- a/controllers/om/mockedomclient.go +++ b/controllers/om/mockedomclient.go @@ -141,6 +141,22 @@ func (oc *MockedOmConnection) ConfigureProject(project *Project) { oc.context.OrgID = project.OrgID } +func (oc *MockedOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) { + oc.addToHistory(reflect.ValueOf(oc.GetReplicaSetMemberIds)) + dep, err := oc.ReadDeployment() + if err != nil { + return nil, err + } + + finalProcessIds := make(map[string]map[string]int) + + for _, replicaSet := range dep.GetReplicaSets() { + finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() + } + + return finalProcessIds, nil +} + var _ Connection = &MockedOmConnection{} // NewEmptyMockedOmConnection is the standard function for creating mocked connections that is usually used for testing diff --git a/controllers/om/omclient.go b/controllers/om/omclient.go index 7ec1336a1..60e19016f 100644 --- a/controllers/om/omclient.go +++ b/controllers/om/omclient.go @@ -64,6 +64,10 @@ type Connection interface { GetPreferredHostnames(agentApiKey string) ([]PreferredHostname, error) AddPreferredHostname(agentApiKey string, value string, isRegexp bool) error + // GetReplicaSetMemberIds returns a map with the replicaset name as the key. + // The value is another map where the key is the replicaset member name and the value is its member id. + GetReplicaSetMemberIds() (map[string]map[string]int, error) + backup.GroupConfigReader backup.GroupConfigUpdater @@ -273,6 +277,21 @@ func (oc *HTTPOmConnection) GetAgentAuthMode() (string, error) { return ac.Auth.AutoAuthMechanism, nil } +func (oc *HTTPOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) { + dep, err := oc.ReadDeployment() + if err != nil { + return nil, err + } + + finalProcessIds := make(map[string]map[string]int) + + for _, replicaSet := range dep.GetReplicaSets() { + finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() + } + + return finalProcessIds, nil +} + var _ Connection = &HTTPOmConnection{} // NewOpsManagerConnection stores OpsManger api endpoint and authentication credentials. diff --git a/controllers/om/replicaset.go b/controllers/om/replicaset.go index 7c6ea8ed5..181899a4f 100644 --- a/controllers/om/replicaset.go +++ b/controllers/om/replicaset.go @@ -146,6 +146,14 @@ func (r ReplicaSet) String() string { return fmt.Sprintf("\"%s\" (members: %v)", r.Name(), r.Members()) } +func (r ReplicaSet) MemberIds() map[string]int { + memberIds := make(map[string]int) + for _, rsMember := range r.Members() { + memberIds[rsMember.Name()] = rsMember.Id() + } + return memberIds +} + // ***************************************** Private methods *********************************************************** func initDefaultRs(set ReplicaSet, name string, protocolVersion string) { diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index caebe414c..a7e959a12 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -198,8 +198,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request return r.updateStatus(ctx, &mrs, status, log) } + // Save replicasets member ids in annotation + finalMemberIds, err := conn.GetReplicaSetMemberIds() + if err != nil { + return r.updateStatus(ctx, &mrs, workflow.Failed(err), log) + } + mrs.Status.FeatureCompatibilityVersion = mrs.CalculateFeatureCompatibilityVersion() - if err := r.saveLastAchievedSpec(ctx, mrs); err != nil { + if err := r.saveLastAchievedSpec(ctx, mrs, finalMemberIds); err != nil { return r.updateStatus(ctx, &mrs, workflow.Failed(xerrors.Errorf("Failed to set annotation: %w", err)), log) } @@ -624,7 +630,7 @@ func getMembersForClusterSpecItemThisReconciliation(mrs *mdbmultiv1.MongoDBMulti } // saveLastAchievedSpec updates the MongoDBMultiCluster resource with the spec that was just achieved. -func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster) error { +func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Context, mrs mdbmultiv1.MongoDBMultiCluster, rsMemberIds map[string]map[string]int) error { clusterSpecs, err := mrs.GetClusterSpecItems() if err != nil { return err @@ -654,6 +660,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Conte annotationsToAdd[mdbmultiv1.LastClusterNumMapping] = string(clusterNumBytes) } + rsMemberIdsBytes, err := json.Marshal(rsMemberIds) + if err != nil { + return err + } + if string(rsMemberIdsBytes) != "null" { + annotationsToAdd[util.LastAchievedRsMemberIds] = string(rsMemberIdsBytes) + } + return annotations.SetAnnotations(ctx, &mrs, annotationsToAdd, r.client) } @@ -696,6 +710,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte } processIds := getReplicaSetProcessIdsFromReplicaSets(mrs.Name, existingDeployment) + // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in annotation + if len(processIds) == 0 { + processIds = getReplicaSetProcessIdsFromAnnotation(mrs) + } log.Debugf("Existing process Ids: %+v", processIds) certificateFileName := "" @@ -791,6 +809,16 @@ func getReplicaSetProcessIdsFromReplicaSets(replicaSetName string, deployment om return processIds } +func getReplicaSetProcessIdsFromAnnotation(mrs mdbmultiv1.MongoDBMultiCluster) map[string]int { + processIds := make(map[string]map[string]int) + if processIdsStr, ok := mrs.Annotations[util.LastAchievedRsMemberIds]; ok { + if err := json.Unmarshal([]byte(processIdsStr), &processIds); err != nil { + return map[string]int{} + } + } + return processIds[mrs.Name] +} + func getSRVService(mrs *mdbmultiv1.MongoDBMultiCluster) corev1.Service { additionalConfig := mrs.Spec.GetAdditionalMongodConfig() port := additionalConfig.GetPortOrDefault() diff --git a/pkg/util/constants.go b/pkg/util/constants.go index b2082f6ee..0a16395d8 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -280,7 +280,10 @@ const ( // TODO: remove this from here and move it to the certs package // This currently creates an import cycle InternalCertAnnotationKey = "internalCertHash" - LastAchievedSpec = "mongodb.com/v1.lastSuccessfulConfiguration" + + // Annotation keys used by the operator + LastAchievedSpec = "mongodb.com/v1.lastSuccessfulConfiguration" + LastAchievedRsMemberIds = "mongodb.com/v1.lastAchievedRsMemberIds" // SecretVolumeName is the name of the volume resource. SecretVolumeName = "secret-certs" From c7f1de2c1eb351ef8fb2277133a19174a9c7f405 Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Mon, 30 Jun 2025 18:45:04 +0200 Subject: [PATCH 3/8] Unit test --- .../mongodbmultireplicaset_controller_test.go | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/controllers/operator/mongodbmultireplicaset_controller_test.go b/controllers/operator/mongodbmultireplicaset_controller_test.go index 730a402a4..1b79676fa 100644 --- a/controllers/operator/mongodbmultireplicaset_controller_test.go +++ b/controllers/operator/mongodbmultireplicaset_controller_test.go @@ -945,6 +945,90 @@ func TestScaling(t *testing.T) { assertMemberNameAndId(t, members, fmt.Sprintf("%s-2-0", mrs.Name), 2) }) + t.Run("Added members reuse member Ids when annotation is set", func(t *testing.T) { + mrs := mdbmulti.DefaultMultiReplicaSetBuilder(). + SetClusterSpecList(clusters). + Build() + + mrs.Spec.ClusterSpecList[0].Members = 1 + mrs.Spec.ClusterSpecList[1].Members = 1 + mrs.Spec.ClusterSpecList[2].Members = 1 + reconciler, client, _, omConnectionFactory := defaultMultiReplicaSetReconciler(ctx, nil, "", "", mrs) + checkMultiReconcileSuccessful(ctx, t, reconciler, mrs, client, false) + + assert.Len(t, omConnectionFactory.GetConnection().(*om.MockedOmConnection).GetProcesses(), 3) + + dep, err := omConnectionFactory.GetConnection().ReadDeployment() + assert.NoError(t, err) + + replicaSets := dep.GetReplicaSets() + + assert.Len(t, replicaSets, 1) + members := replicaSets[0].Members() + assert.Len(t, members, 3) + + assertMemberNameAndId(t, members, fmt.Sprintf("%s-0-0", mrs.Name), 0) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-1-0", mrs.Name), 1) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-2-0", mrs.Name), 2) + + assert.Equal(t, members[0].Id(), 0) + assert.Equal(t, members[1].Id(), 1) + assert.Equal(t, members[2].Id(), 2) + + rsMemberIds := map[string]map[string]int{ + mrs.GetName(): { + fmt.Sprintf("%s-0-0", mrs.Name): 0, + fmt.Sprintf("%s-1-0", mrs.Name): 1, + fmt.Sprintf("%s-2-0", mrs.Name): 2, + }, + } + + rsMemberIdsBytes, _ := json.Marshal(rsMemberIds) + + // Assert that the member ids are saved in the annotation + assert.Equal(t, mrs.GetAnnotations()[util.LastAchievedRsMemberIds], string(rsMemberIdsBytes)) + + // Scaling up this cluster means we get non-sequential member Ids + mrs.Spec.ClusterSpecList[0].Members = 2 + + checkMultiReconcileSuccessful(ctx, t, reconciler, mrs, client, false) + + dep, err = omConnectionFactory.GetConnection().ReadDeployment() + assert.NoError(t, err) + + replicaSets = dep.GetReplicaSets() + + assert.Len(t, replicaSets, 1) + members = replicaSets[0].Members() + assert.Len(t, members, 4) + + assertMemberNameAndId(t, members, fmt.Sprintf("%s-0-0", mrs.Name), 0) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-0-1", mrs.Name), 3) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-1-0", mrs.Name), 1) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-2-0", mrs.Name), 2) + + // Assert that the member ids are updated in the annotation + rsMemberIds[mrs.GetName()][fmt.Sprintf("%s-0-1", mrs.Name)] = 3 + rsMemberIdsBytes, _ = json.Marshal(rsMemberIds) + assert.Equal(t, mrs.GetAnnotations()[util.LastAchievedRsMemberIds], string(rsMemberIdsBytes)) + + // We simulate a changing the project by recreating the omConnection. The resource will keep the annotation. + // This part would have failed before 1.2.0. + reconciler, client, _, omConnectionFactory = defaultMultiReplicaSetReconciler(ctx, nil, "", "", mrs) + checkMultiReconcileSuccessful(ctx, t, reconciler, mrs, client, false) + + dep, err = omConnectionFactory.GetConnection().ReadDeployment() + assert.NoError(t, err) + + replicaSets = dep.GetReplicaSets() + members = replicaSets[0].Members() + + assertMemberNameAndId(t, members, fmt.Sprintf("%s-0-0", mrs.Name), 0) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-0-1", mrs.Name), 3) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-1-0", mrs.Name), 1) + assertMemberNameAndId(t, members, fmt.Sprintf("%s-2-0", mrs.Name), 2) + }) + t.Run("Cluster can be added", func(t *testing.T) { mrs := mdbmulti.DefaultMultiReplicaSetBuilder().SetClusterSpecList(clusters).Build() mrs.Spec.ClusterSpecList = mrs.Spec.ClusterSpecList[:len(mrs.Spec.ClusterSpecList)-1] From 1da9a932e967acb28340a57e90e56a277c4c0750 Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Tue, 1 Jul 2025 12:57:20 +0200 Subject: [PATCH 4/8] E2E test --- .../multi_cluster_scale_up_cluster.py | 107 +++++++++++++++--- 1 file changed, 89 insertions(+), 18 deletions(-) diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py index f289ea19d..14a621b22 100644 --- a/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py +++ b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py @@ -1,7 +1,9 @@ +from operator import truediv from typing import List import kubernetes import pytest +from kubetester import create_or_update_configmap, read_configmap, try_load, wait_until, random_k8s_name from kubetester.automation_config_tester import AutomationConfigTester from kubetester.certs_mongodb_multi import create_multi_cluster_mongodb_tls_certs from kubetester.kubetester import fixture as yaml_fixture @@ -16,6 +18,27 @@ RESOURCE_NAME = "multi-replica-set" BUNDLE_SECRET_NAME = f"prefix-{RESOURCE_NAME}-cert" +@pytest.fixture(scope="module") +def project_name_prefix(namespace: str) -> str: + return random_k8s_name(f"{namespace}-project-") + +@pytest.fixture(scope="module") +def new_project_configmap( + namespace: str, + project_name_prefix: str +) -> str: + cm = read_configmap(namespace=namespace, name="my-project") + project_name = f"{project_name_prefix}-new-project" + return create_or_update_configmap( + namespace=namespace, + name=project_name, + data={ + "baseUrl": cm["baseUrl"], + "projectName": project_name, + "orgId": cm["orgId"], + }, + ) + @pytest.fixture(scope="module") def mongodb_multi_unmarshalled( @@ -28,7 +51,7 @@ def mongodb_multi_unmarshalled( resource = MongoDBMulti.from_yaml(yaml_fixture("mongodb-multi.yaml"), RESOURCE_NAME, namespace) resource.set_version(custom_mdb_version) # ensure certs are created for the members during scale up - resource["spec"]["clusterSpecList"] = cluster_spec_list(member_cluster_names, [2, 1, 2]) + resource["spec"]["clusterSpecList"] = cluster_spec_list(member_cluster_names, [3, 1, 2]) resource["spec"]["security"] = { "certsSecretPrefix": "prefix", "tls": { @@ -58,9 +81,14 @@ def server_certs( @pytest.fixture(scope="module") def mongodb_multi(mongodb_multi_unmarshalled: MongoDBMulti, server_certs: str) -> MongoDBMulti: + if try_load(mongodb_multi_unmarshalled): + return mongodb_multi_unmarshalled + # remove the last element, we are only starting with 2 clusters we will scale up the 3rd one later. mongodb_multi_unmarshalled["spec"]["clusterSpecList"].pop() - return mongodb_multi_unmarshalled.update() + # remove one member from the first cluster to start with 2 members + mongodb_multi_unmarshalled["spec"]["clusterSpecList"][0]["members"] = 2 + return mongodb_multi_unmarshalled @pytest.mark.e2e_multi_cluster_scale_up_cluster @@ -70,6 +98,7 @@ def test_deploy_operator(multi_cluster_operator: Operator): @pytest.mark.e2e_multi_cluster_scale_up_cluster def test_create_mongodb_multi(mongodb_multi: MongoDBMulti): + mongodb_multi.update() mongodb_multi.assert_reaches_phase(Phase.Running, timeout=600) @@ -97,7 +126,6 @@ def test_ops_manager_has_been_updated_correctly_before_scaling(): @pytest.mark.e2e_multi_cluster_scale_up_cluster def test_scale_mongodb_multi(mongodb_multi: MongoDBMulti, member_cluster_clients: List[MultiClusterClient]): - mongodb_multi.load() mongodb_multi["spec"]["clusterSpecList"].append( {"members": 2, "clusterName": member_cluster_clients[2].cluster_name} ) @@ -111,22 +139,8 @@ def test_statefulsets_have_been_scaled_up_correctly( mongodb_multi: MongoDBMulti, member_cluster_clients: List[MultiClusterClient], ): - statefulsets = mongodb_multi.read_statefulsets(member_cluster_clients) - - assert len(statefulsets) == 3 - - cluster_one_client = member_cluster_clients[0] - cluster_one_sts = statefulsets[cluster_one_client.cluster_name] - assert cluster_one_sts.status.ready_replicas == 2 - - cluster_two_client = member_cluster_clients[1] - cluster_two_sts = statefulsets[cluster_two_client.cluster_name] - assert cluster_two_sts.status.ready_replicas == 1 - - cluster_three_client = member_cluster_clients[2] - cluster_three_sts = statefulsets[cluster_three_client.cluster_name] - assert cluster_three_sts.status.ready_replicas == 2 + wait_until(sts_are_ready(mongodb_multi, member_cluster_clients, [2, 1, 2]), timeout=60) @pytest.mark.e2e_multi_cluster_scale_up_cluster def test_ops_manager_has_been_updated_correctly_after_scaling(): @@ -139,3 +153,60 @@ def test_ops_manager_has_been_updated_correctly_after_scaling(): def test_replica_set_is_reachable(mongodb_multi: MongoDBMulti, ca_path: str): tester = mongodb_multi.tester() tester.assert_connectivity(opts=[with_tls(use_tls=True, ca_path=ca_path)]) + + +# From here on, the tests are for verifying that we can change the project of the MongoDBMulti resource even with +# non-sequential member ids in the replicaset. + + +@pytest.mark.e2e_multi_cluster_scale_up_cluster +def test_scale_up_first_cluster( + mongodb_multi: MongoDBMulti, + member_cluster_clients: List[MultiClusterClient], +): + # Scale up the first cluster to 3 members. This will lead to non-sequential member ids in the replicaset. + # multi-replica-set-0-0 : 0 + # multi-replica-set-0-1 : 1 + # multi-replica-set-0-2 : 5 + # multi-replica-set-1-0 : 2 + # multi-replica-set-2-0 : 3 + # multi-replica-set-2-1 : 4 + + mongodb_multi["spec"]["clusterSpecList"][0]["members"] = 3 + mongodb_multi.update() + + + wait_until(sts_are_ready(mongodb_multi, member_cluster_clients, [3,1,2]), timeout=600) + mongodb_multi.assert_reaches_phase(Phase.Running, timeout=600) + + +@pytest.mark.e2e_multi_cluster_scale_up_cluster +def test_change_project(mongodb_multi: MongoDBMulti, new_project_configmap: str): + oldRsMembers = mongodb_multi.get_automation_config_tester().get_replica_set_members(mongodb_multi.name) + + mongodb_multi["spec"]["opsManager"]["configMapRef"]["name"] = new_project_configmap + mongodb_multi.update() + + mongodb_multi.assert_abandons_phase(phase=Phase.Running, timeout=300) + mongodb_multi.assert_reaches_phase(phase=Phase.Running, timeout=600) + + newRsMembers = mongodb_multi.get_automation_config_tester().get_replica_set_members(mongodb_multi.name) + + # Assert that the replica set member ids have not changed after changing the project. + assert oldRsMembers == newRsMembers + + +def sts_are_ready(mdb: MongoDBMulti, member_cluster_clients: List[MultiClusterClient], members: List[int]): + def fn(): + statefulsets = mdb.read_statefulsets(member_cluster_clients) + + assert len(statefulsets) == len(members) + + for i, mcc in enumerate(member_cluster_clients): + cluster_sts = statefulsets[mcc.cluster_name] + if cluster_sts.status.ready_replicas != members[i]: + return False + + return True + + return fn From fd447849365b7562083d122935e593a9dc1afbdf Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Tue, 1 Jul 2025 13:13:19 +0200 Subject: [PATCH 5/8] Release notes --- RELEASE_NOTES.md | 3 +++ .../multi_cluster_scale_up_cluster.py | 19 ++++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1dc4431ad..7cf4979e6 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -25,6 +25,9 @@ * [Manage Database Users using OIDC](https://www.mongodb.com/docs/kubernetes/upcoming/manage-users/) # TODO * [Authentication and Authorization with OIDC/OAuth 2.0](https://www.mongodb.com/docs/manual/core/oidc/security-oidc/) +## Bug Fixes +* Fixed an issue where moving a **MongoDBMultiCluster** resource to a new project (or a new OM instance) would leave the deployment in a failed state. + # MCK 1.1.0 Release Notes diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py index 14a621b22..5f420b109 100644 --- a/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py +++ b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py @@ -3,7 +3,13 @@ import kubernetes import pytest -from kubetester import create_or_update_configmap, read_configmap, try_load, wait_until, random_k8s_name +from kubetester import ( + create_or_update_configmap, + random_k8s_name, + read_configmap, + try_load, + wait_until, +) from kubetester.automation_config_tester import AutomationConfigTester from kubetester.certs_mongodb_multi import create_multi_cluster_mongodb_tls_certs from kubetester.kubetester import fixture as yaml_fixture @@ -18,15 +24,14 @@ RESOURCE_NAME = "multi-replica-set" BUNDLE_SECRET_NAME = f"prefix-{RESOURCE_NAME}-cert" + @pytest.fixture(scope="module") def project_name_prefix(namespace: str) -> str: return random_k8s_name(f"{namespace}-project-") + @pytest.fixture(scope="module") -def new_project_configmap( - namespace: str, - project_name_prefix: str -) -> str: +def new_project_configmap(namespace: str, project_name_prefix: str) -> str: cm = read_configmap(namespace=namespace, name="my-project") project_name = f"{project_name_prefix}-new-project" return create_or_update_configmap( @@ -142,6 +147,7 @@ def test_statefulsets_have_been_scaled_up_correctly( wait_until(sts_are_ready(mongodb_multi, member_cluster_clients, [2, 1, 2]), timeout=60) + @pytest.mark.e2e_multi_cluster_scale_up_cluster def test_ops_manager_has_been_updated_correctly_after_scaling(): ac = AutomationConfigTester() @@ -175,8 +181,7 @@ def test_scale_up_first_cluster( mongodb_multi["spec"]["clusterSpecList"][0]["members"] = 3 mongodb_multi.update() - - wait_until(sts_are_ready(mongodb_multi, member_cluster_clients, [3,1,2]), timeout=600) + wait_until(sts_are_ready(mongodb_multi, member_cluster_clients, [3, 1, 2]), timeout=600) mongodb_multi.assert_reaches_phase(Phase.Running, timeout=600) From 77b8a186efb6dc30af713b6f406da8272a6bc3af Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Wed, 2 Jul 2025 10:20:51 +0200 Subject: [PATCH 6/8] PR feedback --- controllers/om/mockedomclient.go | 16 ---- controllers/om/omclient.go | 34 ++++---- .../mongodbmultireplicaset_controller.go | 17 ++-- .../kubetester/mongodb_multi.py | 16 ++++ .../multi_cluster_scale_up_cluster.py | 80 +++++++------------ 5 files changed, 68 insertions(+), 95 deletions(-) diff --git a/controllers/om/mockedomclient.go b/controllers/om/mockedomclient.go index 60978fc4e..51c3f844d 100644 --- a/controllers/om/mockedomclient.go +++ b/controllers/om/mockedomclient.go @@ -141,22 +141,6 @@ func (oc *MockedOmConnection) ConfigureProject(project *Project) { oc.context.OrgID = project.OrgID } -func (oc *MockedOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) { - oc.addToHistory(reflect.ValueOf(oc.GetReplicaSetMemberIds)) - dep, err := oc.ReadDeployment() - if err != nil { - return nil, err - } - - finalProcessIds := make(map[string]map[string]int) - - for _, replicaSet := range dep.GetReplicaSets() { - finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() - } - - return finalProcessIds, nil -} - var _ Connection = &MockedOmConnection{} // NewEmptyMockedOmConnection is the standard function for creating mocked connections that is usually used for testing diff --git a/controllers/om/omclient.go b/controllers/om/omclient.go index 60e19016f..cbc0c2875 100644 --- a/controllers/om/omclient.go +++ b/controllers/om/omclient.go @@ -64,10 +64,6 @@ type Connection interface { GetPreferredHostnames(agentApiKey string) ([]PreferredHostname, error) AddPreferredHostname(agentApiKey string, value string, isRegexp bool) error - // GetReplicaSetMemberIds returns a map with the replicaset name as the key. - // The value is another map where the key is the replicaset member name and the value is its member id. - GetReplicaSetMemberIds() (map[string]map[string]int, error) - backup.GroupConfigReader backup.GroupConfigUpdater @@ -277,21 +273,6 @@ func (oc *HTTPOmConnection) GetAgentAuthMode() (string, error) { return ac.Auth.AutoAuthMechanism, nil } -func (oc *HTTPOmConnection) GetReplicaSetMemberIds() (map[string]map[string]int, error) { - dep, err := oc.ReadDeployment() - if err != nil { - return nil, err - } - - finalProcessIds := make(map[string]map[string]int) - - for _, replicaSet := range dep.GetReplicaSets() { - finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() - } - - return finalProcessIds, nil -} - var _ Connection = &HTTPOmConnection{} // NewOpsManagerConnection stores OpsManger api endpoint and authentication credentials. @@ -997,6 +978,21 @@ func (oc *HTTPOmConnection) AddPreferredHostname(agentApiKey string, value strin return nil } +func GetReplicaSetMemberIds(conn Connection) (map[string]map[string]int, error) { + dep, err := conn.ReadDeployment() + if err != nil { + return nil, err + } + + finalProcessIds := make(map[string]map[string]int) + + for _, replicaSet := range dep.GetReplicaSets() { + finalProcessIds[replicaSet.Name()] = replicaSet.MemberIds() + } + + return finalProcessIds, nil +} + //********************************** Private methods ******************************************************************* func (oc *HTTPOmConnection) get(path string) ([]byte, error) { diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index 8b412f761..9ad2f5d12 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -201,8 +201,7 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request return r.updateStatus(ctx, &mrs, status, log) } - // Save replicasets member ids in annotation - finalMemberIds, err := conn.GetReplicaSetMemberIds() + finalMemberIds, err := om.GetReplicaSetMemberIds(conn) if err != nil { return r.updateStatus(ctx, &mrs, workflow.Failed(err), log) } @@ -663,12 +662,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) saveLastAchievedSpec(ctx context.Conte annotationsToAdd[mdbmultiv1.LastClusterNumMapping] = string(clusterNumBytes) } - rsMemberIdsBytes, err := json.Marshal(rsMemberIds) - if err != nil { - return err - } - if string(rsMemberIdsBytes) != "null" { - annotationsToAdd[util.LastAchievedRsMemberIds] = string(rsMemberIdsBytes) + if len(rsMemberIds) > 0 { + rsMemberIdsBytes, err := json.Marshal(rsMemberIds) + if err != nil { + return err + } + if len(rsMemberIdsBytes) > 0 { + annotationsToAdd[util.LastAchievedRsMemberIds] = string(rsMemberIdsBytes) + } } return annotations.SetAnnotations(ctx, &mrs, annotationsToAdd, r.client) diff --git a/docker/mongodb-kubernetes-tests/kubetester/mongodb_multi.py b/docker/mongodb-kubernetes-tests/kubetester/mongodb_multi.py index b35316c32..ab5f867d3 100644 --- a/docker/mongodb-kubernetes-tests/kubetester/mongodb_multi.py +++ b/docker/mongodb-kubernetes-tests/kubetester/mongodb_multi.py @@ -3,6 +3,7 @@ from typing import Dict, List, Optional from kubernetes import client +from kubetester import wait_until from kubetester.mongodb import MongoDB from kubetester.mongotester import MongoTester, MultiReplicaSetTester from kubetester.multicluster_client import MultiClusterClient @@ -77,6 +78,21 @@ def service_names(self) -> List[str]: service_names.append(f"{self.name}-{i}-{j}-svc") return service_names + def assert_statefulsets_are_ready(self, clients: List[MultiClusterClient], timeout: int = 600): + def fn(): + statefulsets = self.read_statefulsets(clients) + + assert len(statefulsets) == len(self["spec"]["clusterSpecList"]) + + for i, mcc in enumerate(clients): + cluster_sts = statefulsets[mcc.cluster_name] + if cluster_sts.status.ready_replicas != self.get_item_spec(mcc.cluster_name)["members"]: + return False + + return True + + wait_until(fn, timeout=timeout, interval=10, message="Waiting for all statefulsets to be ready") + def tester( self, ca_path: Optional[str] = None, diff --git a/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py index 5f420b109..3acc73dff 100644 --- a/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py +++ b/docker/mongodb-kubernetes-tests/tests/multicluster/multi_cluster_scale_up_cluster.py @@ -12,6 +12,7 @@ ) from kubetester.automation_config_tester import AutomationConfigTester from kubetester.certs_mongodb_multi import create_multi_cluster_mongodb_tls_certs +from kubetester.kubetester import KubernetesTester from kubetester.kubetester import fixture as yaml_fixture from kubetester.kubetester import skip_if_local from kubetester.mongodb_multi import MongoDBMulti @@ -84,7 +85,7 @@ def server_certs( ) -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def mongodb_multi(mongodb_multi_unmarshalled: MongoDBMulti, server_certs: str) -> MongoDBMulti: if try_load(mongodb_multi_unmarshalled): return mongodb_multi_unmarshalled @@ -113,14 +114,7 @@ def test_statefulsets_have_been_created_correctly( member_cluster_clients: List[MultiClusterClient], ): # read all statefulsets except the last one - statefulsets = mongodb_multi.read_statefulsets(member_cluster_clients[:-1]) - cluster_one_client = member_cluster_clients[0] - cluster_one_sts = statefulsets[cluster_one_client.cluster_name] - assert cluster_one_sts.status.ready_replicas == 2 - - cluster_two_client = member_cluster_clients[1] - cluster_two_sts = statefulsets[cluster_two_client.cluster_name] - assert cluster_two_sts.status.ready_replicas == 1 + mongodb_multi.assert_statefulsets_are_ready(member_cluster_clients[:-1]) @pytest.mark.e2e_multi_cluster_scale_up_cluster @@ -144,8 +138,7 @@ def test_statefulsets_have_been_scaled_up_correctly( mongodb_multi: MongoDBMulti, member_cluster_clients: List[MultiClusterClient], ): - - wait_until(sts_are_ready(mongodb_multi, member_cluster_clients, [2, 1, 2]), timeout=60) + mongodb_multi.assert_statefulsets_are_ready(member_cluster_clients, timeout=60) @pytest.mark.e2e_multi_cluster_scale_up_cluster @@ -166,52 +159,35 @@ def test_replica_set_is_reachable(mongodb_multi: MongoDBMulti, ca_path: str): @pytest.mark.e2e_multi_cluster_scale_up_cluster -def test_scale_up_first_cluster( - mongodb_multi: MongoDBMulti, - member_cluster_clients: List[MultiClusterClient], -): - # Scale up the first cluster to 3 members. This will lead to non-sequential member ids in the replicaset. - # multi-replica-set-0-0 : 0 - # multi-replica-set-0-1 : 1 - # multi-replica-set-0-2 : 5 - # multi-replica-set-1-0 : 2 - # multi-replica-set-2-0 : 3 - # multi-replica-set-2-1 : 4 - - mongodb_multi["spec"]["clusterSpecList"][0]["members"] = 3 - mongodb_multi.update() - - wait_until(sts_are_ready(mongodb_multi, member_cluster_clients, [3, 1, 2]), timeout=600) - mongodb_multi.assert_reaches_phase(Phase.Running, timeout=600) - - -@pytest.mark.e2e_multi_cluster_scale_up_cluster -def test_change_project(mongodb_multi: MongoDBMulti, new_project_configmap: str): - oldRsMembers = mongodb_multi.get_automation_config_tester().get_replica_set_members(mongodb_multi.name) - - mongodb_multi["spec"]["opsManager"]["configMapRef"]["name"] = new_project_configmap - mongodb_multi.update() - - mongodb_multi.assert_abandons_phase(phase=Phase.Running, timeout=300) - mongodb_multi.assert_reaches_phase(phase=Phase.Running, timeout=600) +class TestNonSequentialMemberIdsInReplicaSet(KubernetesTester): - newRsMembers = mongodb_multi.get_automation_config_tester().get_replica_set_members(mongodb_multi.name) + def test_scale_up_first_cluster( + self, mongodb_multi: MongoDBMulti, member_cluster_clients: List[MultiClusterClient] + ): + # Scale up the first cluster to 3 members. This will lead to non-sequential member ids in the replicaset. + # multi-replica-set-0-0 : 0 + # multi-replica-set-0-1 : 1 + # multi-replica-set-0-2 : 5 + # multi-replica-set-1-0 : 2 + # multi-replica-set-2-0 : 3 + # multi-replica-set-2-1 : 4 - # Assert that the replica set member ids have not changed after changing the project. - assert oldRsMembers == newRsMembers + mongodb_multi["spec"]["clusterSpecList"][0]["members"] = 3 + mongodb_multi.update() + mongodb_multi.assert_statefulsets_are_ready(member_cluster_clients) + mongodb_multi.assert_reaches_phase(Phase.Running, timeout=600) -def sts_are_ready(mdb: MongoDBMulti, member_cluster_clients: List[MultiClusterClient], members: List[int]): - def fn(): - statefulsets = mdb.read_statefulsets(member_cluster_clients) + def test_change_project(self, mongodb_multi: MongoDBMulti, new_project_configmap: str): + oldRsMembers = mongodb_multi.get_automation_config_tester().get_replica_set_members(mongodb_multi.name) - assert len(statefulsets) == len(members) + mongodb_multi["spec"]["opsManager"]["configMapRef"]["name"] = new_project_configmap + mongodb_multi.update() - for i, mcc in enumerate(member_cluster_clients): - cluster_sts = statefulsets[mcc.cluster_name] - if cluster_sts.status.ready_replicas != members[i]: - return False + mongodb_multi.assert_abandons_phase(phase=Phase.Running, timeout=300) + mongodb_multi.assert_reaches_phase(phase=Phase.Running, timeout=600) - return True + newRsMembers = mongodb_multi.get_automation_config_tester().get_replica_set_members(mongodb_multi.name) - return fn + # Assert that the replica set member ids have not changed after changing the project. + assert oldRsMembers == newRsMembers From 3c90c6e400ae0309f51f365606a20c7e96e2aeb7 Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Wed, 2 Jul 2025 16:39:23 +0200 Subject: [PATCH 7/8] PR feedback 2 --- .../operator/mongodbmultireplicaset_controller.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index 9ad2f5d12..aa24cdde0 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -716,7 +716,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte processIds := getReplicaSetProcessIdsFromReplicaSets(mrs.Name, existingDeployment) // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in annotation if len(processIds) == 0 { - processIds = getReplicaSetProcessIdsFromAnnotation(mrs) + processIds, err = getReplicaSetProcessIdsFromAnnotation(mrs) + if err != nil { + return xerrors.Errorf("failed to get member ids from annotation: %w", err) + } } log.Debugf("Existing process Ids: %+v", processIds) @@ -813,14 +816,15 @@ func getReplicaSetProcessIdsFromReplicaSets(replicaSetName string, deployment om return processIds } -func getReplicaSetProcessIdsFromAnnotation(mrs mdbmultiv1.MongoDBMultiCluster) map[string]int { - processIds := make(map[string]map[string]int) +func getReplicaSetProcessIdsFromAnnotation(mrs mdbmultiv1.MongoDBMultiCluster) (map[string]int, error) { if processIdsStr, ok := mrs.Annotations[util.LastAchievedRsMemberIds]; ok { + processIds := make(map[string]map[string]int) if err := json.Unmarshal([]byte(processIdsStr), &processIds); err != nil { - return map[string]int{} + return map[string]int{}, err } + return processIds[mrs.Name], nil } - return processIds[mrs.Name] + return make(map[string]int), nil } func getSRVService(mrs *mdbmultiv1.MongoDBMultiCluster) corev1.Service { From caf3152bc3486d6efd3f5e191592f4ddd262fd2f Mon Sep 17 00:00:00 2001 From: Lucian Tosa Date: Wed, 2 Jul 2025 18:34:31 +0200 Subject: [PATCH 8/8] Add comment --- controllers/operator/mongodbmultireplicaset_controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index aa24cdde0..fc2ccde71 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -714,7 +714,9 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte } processIds := getReplicaSetProcessIdsFromReplicaSets(mrs.Name, existingDeployment) + // If there is no replicaset configuration saved in OM, it might be a new project, so we check the ids saved in annotation + // A project migration can happen if .spec.opsManager.configMapRef is changed, or the original configMap has been modified. if len(processIds) == 0 { processIds, err = getReplicaSetProcessIdsFromAnnotation(mrs) if err != nil {