Skip to content

Commit

Permalink
add test to validate bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Nov 12, 2024
1 parent 922e489 commit 620b387
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ public long BeginReplicaRecover(
remoteCheckpoint.metadata.objectStoreHlogToken);

storeWrapper.RecoverCheckpoint(
replicaRecover: true,
recoverMainStoreFromToken,
recoverObjectStoreFromToken,
remoteCheckpoint.metadata);
Expand Down
24 changes: 21 additions & 3 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,31 @@ internal void Recover()
/// <summary>
/// Caller will have to decide if recover is necessary, so we do not check if recover option is enabled
/// </summary>
public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, CheckpointMetadata metadata = null)
public void RecoverCheckpoint(bool replicaRecover = false, bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, CheckpointMetadata metadata = null)
{
long storeVersion = -1, objectStoreVersion = -1;
try
{
storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(metadata.storeIndexToken, metadata.storeHlogToken);
if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(metadata.objectStoreIndexToken, metadata.objectStoreHlogToken);
if (replicaRecover)
{
if (metadata.storeIndexToken != default && metadata.storeHlogToken != default)
{
storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(metadata.storeIndexToken, metadata.storeHlogToken);
}

if (!serverOptions.DisableObjects)
{
if (metadata.objectStoreIndexToken != default && metadata.objectStoreHlogToken != default)
{
objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(metadata.objectStoreIndexToken, metadata.objectStoreHlogToken);
}
}
}
else
{
storeVersion = store.Recover();
if (objectStore != null) objectStoreVersion = objectStore.Recover();
}
if (storeVersion > 0 || objectStoreVersion > 0)
lastSaveTime = DateTimeOffset.UtcNow;
}
Expand Down
30 changes: 24 additions & 6 deletions test/Garnet.test.cluster/ClusterReplicationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ public void ClusterReplicateFails()
ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node "));
}

//[Test, Order(22), Timeout(testTimeout)]
[Test, Order(22), Timeout(testTimeout)]
public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW)
{
var replica_count = 1;// Per primary
Expand Down Expand Up @@ -1086,9 +1086,11 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW)

var primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger);
var replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger);
Assert.AreEqual(6, primaryVersion);
Assert.AreEqual("6", primaryVersion);
Assert.AreEqual(primaryVersion, replicaVersion);

context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaNodeIndex);

// Dispose primary and delete data
context.nodes[primaryNodeIndex].Dispose(true);
// Dispose primary but do not delete data
Expand Down Expand Up @@ -1126,16 +1128,32 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW)
// Assert primary version is 1 and replica has recovered to previous checkpoint
primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger);
replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger);
Assert.AreEqual(1, primaryVersion);
Assert.AreEqual(6, replicaVersion);
Assert.AreEqual("1", primaryVersion);
Assert.AreEqual("6", replicaVersion);

// Setup cluster
Assert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger));

context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger);
context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger);
context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
var primaryNodeId = context.clusterTestUtils.ClusterMyId(primaryNodeIndex, logger: context.logger);

// Enable replication
context.clusterTestUtils.WaitUntilNodeIdIsKnown(replicaNodeIndex, primaryNodeId, logger: context.logger);
Assert.AreEqual("OK", context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger));

// Both nodes are at version 1 despite replica recovering to version earlier
primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger);
replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger);
Assert.AreEqual("1", primaryVersion);
Assert.AreEqual("1", replicaVersion);

context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
// At this point attached replica should be empty because primary did not have any data because it did not recover
foreach (var pair in context.kvPairs)
{
var resp = context.clusterTestUtils.GetKey(replicaNodeIndex, Encoding.ASCII.GetBytes(pair.Key), out _, out _, out _, out var state, logger: context.logger);
Assert.IsNull(resp);
}
}
}
}

0 comments on commit 620b387

Please sign in to comment.