Skip to content

Commit

Permalink
add async replay tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Nov 14, 2024
1 parent 6f36a46 commit 7baae01
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 29 deletions.
101 changes: 101 additions & 0 deletions test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using NUnit.Framework;

namespace Garnet.test.cluster
{
[TestFixture, NonParallelizable]
public unsafe class ClusterReplicationAsyncReplayTests
{
ClusterReplicationTests tests;

[SetUp]
public void Setup()
{
tests = new ClusterReplicationTests(UseTLS: false, asyncReplay: true);
tests.Setup();
}

[TearDown]
public void TearDown()
{
tests.TearDown();
tests = null;
}

[Test, Order(1)]
[Category("REPLICATION")]
public void ClusterAsyncReplaySRTest([Values] bool disableObjects) => tests.ClusterSRTest(disableObjects);

[Test, Order(2)]
[Category("REPLICATION")]
public void ClusterAsyncReplayCheckpointRestartSecondary([Values] bool performRMW, [Values] bool disableObjects)
=> tests.ClusterSRNoCheckpointRestartSecondary(performRMW, disableObjects);

[Test, Order(3)]
[Category("REPLICATION")]
public void ClusterAsyncReplayPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects)
=> tests.ClusterSRPrimaryCheckpoint(performRMW, disableObjects);

[Test, Order(4)]
[Category("REPLICATION")]
public void ClusterAsyncReplaySRPrimaryCheckpointRetrieve([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory, [Values] bool manySegments)
=> tests.ClusterSRPrimaryCheckpointRetrieve(performRMW, disableObjects, lowMemory, manySegments);

[Test, Order(5)]
[Category("REPLICATION")]
public void ClusterAsyncReplayCheckpointRetrieveDisableStorageTier([Values] bool performRMW, [Values] bool disableObjects)
=> tests.ClusterCheckpointRetrieveDisableStorageTier(performRMW, disableObjects);

[Test, Order(6)]
[Category("REPLICATION")]
public void ClusterAsyncReplayReplicaAfterPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory)
=> tests.ClusterSRAddReplicaAfterPrimaryCheckpoint(performRMW, disableObjects, lowMemory);

[Test, Order(7)]
[Category("REPLICATION")]
public void ClusterAsyncReplayPrimaryRestart([Values] bool performRMW, [Values] bool disableObjects)
=> tests.ClusterSRPrimaryRestart(performRMW, disableObjects);

[Test, Order(8)]
[Category("REPLICATION")]
public void ClusterAsyncReplayRedirectWrites()
=> tests.ClusterSRRedirectWrites();

[Test, Order(9)]
[Category("REPLICATION")]
public void ClusterAsyncReplayRedirectWrites([Values] bool performRMW)
=> tests.ClusterSRReplicaOfTest(performRMW);

[Test, Order(10)]
[Category("REPLICATION")]
public void ClusterAsyncReplayReplicationSimpleFailover([Values] bool performRMW, [Values] bool checkpoint)
=> tests.ClusterReplicationSimpleFailover(performRMW, checkpoint);

[Test, Order(11)]
[Category("REPLICATION")]
public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] bool takePrimaryCheckpoint, [Values] bool takeNewPrimaryCheckpoint, [Values] bool enableIncrementalSnapshots)
=> tests.ClusterFailoverAttachReplicas(performRMW, takePrimaryCheckpoint, takeNewPrimaryCheckpoint, enableIncrementalSnapshots);

[Test, Order(12)]
[Category("REPLICATION")]
public void ClusterAsyncReplayDivergentCheckpointTest([Values] bool performRMW, [Values] bool disableObjects)
=> tests.ClusterDivergentCheckpointTest(performRMW, disableObjects);

[Test, Order(13)]
[Category("REPLICATION")]
public void ClusterAsyncReplayDivergentReplicasMMTest([Values] bool performRMW, [Values] bool disableObjects, [Values] bool ckptBeforeDivergence)
=> tests.ClusterDivergentReplicasMMTest(performRMW, disableObjects, ckptBeforeDivergence);

[Test, Order(14)]
[Category("REPLICATION")]
public void ClusterAsyncReplayDivergentCheckpointMMTest([Values] bool performRMW, [Values] bool disableObjects)
=> tests.ClusterDivergentCheckpointMMTest(performRMW, disableObjects);

[Test, Order(15)]
[Category("REPLICATION")]
public void ClusterAsyncReplayDivergentCheckpointMMFastCommitTest([Values] bool disableObjects, [Values] bool mainMemoryReplication)
=> tests.ClusterDivergentCheckpointMMFastCommitTest(disableObjects, mainMemoryReplication);
}
}
53 changes: 28 additions & 25 deletions test/Garnet.test.cluster/ClusterReplicationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace Garnet.test.cluster
{
[TestFixture(false), NonParallelizable]
public class ClusterReplicationTests(bool UseTLS = false)
public class ClusterReplicationTests(bool UseTLS = false, bool asyncReplay = false)
{
public (Action, string)[] GetUnitTests()
{
Expand Down Expand Up @@ -150,7 +150,7 @@ public void ClusterSRNoCheckpointRestartSecondary([Values] bool performRMW, [Val
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);
var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);

Expand Down Expand Up @@ -216,7 +216,7 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);
var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);

Expand Down Expand Up @@ -272,7 +272,8 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d
enableAOF: true,
timeout: timeout,
useTLS: useTLS,
cleanClusterConfig: false);
cleanClusterConfig: false,
asyncReplay: asyncReplay);
context.nodes[1].Start();
context.CreateConnection(useTLS: useTLS);

Expand Down Expand Up @@ -314,7 +315,7 @@ void ClusterSRPrimaryCheckpointRetrieve(bool performRMW, bool disableObjects, bo
var primary_count = 1;
var nodes_count = primary_count + primary_count * replica_count;
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, disableObjects: disableObjects, lowMemory: lowMemory, SegmentSize: manySegments ? "4k" : "1g", DisableStorageTier: disableStorageTier, EnableIncrementalSnapshots: incrementalSnapshots, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, disableObjects: disableObjects, lowMemory: lowMemory, SegmentSize: manySegments ? "4k" : "1g", DisableStorageTier: disableStorageTier, EnableIncrementalSnapshots: incrementalSnapshots, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);
var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);

Expand Down Expand Up @@ -370,7 +371,8 @@ void ClusterSRPrimaryCheckpointRetrieve(bool performRMW, bool disableObjects, bo
cleanClusterConfig: false,
lowMemory: lowMemory,
SegmentSize: manySegments ? "4k" : "1g",
DisableStorageTier: disableStorageTier);
DisableStorageTier: disableStorageTier,
asyncReplay: asyncReplay);
context.nodes[replicaIndex].Start();
context.CreateConnection(useTLS: useTLS);

Expand All @@ -389,7 +391,7 @@ public void ClusterSRAddReplicaAfterPrimaryCheckpoint([Values] bool performRMW,
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, lowMemory: lowMemory, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, lowMemory: lowMemory, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);

ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger));
Expand Down Expand Up @@ -448,7 +450,7 @@ public void ClusterSRPrimaryRestart([Values] bool performRMW, [Values] bool disa
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);

ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger));
Expand Down Expand Up @@ -492,7 +494,8 @@ public void ClusterSRPrimaryRestart([Values] bool performRMW, [Values] bool disa
enableAOF: true,
timeout: timeout,
useTLS: useTLS,
cleanClusterConfig: false);
cleanClusterConfig: false,
asyncReplay: asyncReplay);
context.nodes[0].Start();
context.CreateConnection(useTLS: useTLS);

Expand All @@ -514,7 +517,7 @@ public void ClusterSRRedirectWrites()
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);

var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);
Expand All @@ -539,10 +542,10 @@ public void ClusterSRRedirectWrites()
public void ClusterSRReplicaOfTest([Values] bool performRMW)
{
var nodes_count = 2;
context.CreateInstances(nodes_count, tryRecover: true, disableObjects: true, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, tryRecover: true, disableObjects: true, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);

ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger));
ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], true, context.logger));

context.clusterTestUtils.SetConfigEpoch(0, 1, context.logger);
context.clusterTestUtils.SetConfigEpoch(1, 2, context.logger);
Expand Down Expand Up @@ -578,7 +581,7 @@ public void ClusterReplicationSimpleFailover([Values] bool performRMW, [Values]
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);
var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);

Expand Down Expand Up @@ -648,7 +651,7 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, disableObjects: true, EnableIncrementalSnapshots: enableIncrementalSnapshots, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, disableObjects: true, EnableIncrementalSnapshots: enableIncrementalSnapshots, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);
var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);

Expand Down Expand Up @@ -905,7 +908,7 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, disableObjects: disableObjects, MainMemoryReplication: mainMemoryReplication, CommitFrequencyMs: mainMemoryReplication ? -1 : 0, OnDemandCheckpoint: mainMemoryReplication, FastCommit: fastCommit, enableAOF: true, useTLS: useTLS);
context.CreateInstances(nodes_count, disableObjects: disableObjects, MainMemoryReplication: mainMemoryReplication, CommitFrequencyMs: mainMemoryReplication ? -1 : 0, OnDemandCheckpoint: mainMemoryReplication, FastCommit: fastCommit, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
context.CreateConnection(useTLS: useTLS);
_ = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);

Expand All @@ -916,8 +919,8 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp
var keyLength = 8;
var kvpairCount = 16;
var addCount = 5;
context.kvPairs = new();
context.kvPairsObj = new Dictionary<string, List<int>>();
context.kvPairs = [];
context.kvPairsObj = [];

_ = context.clusterTestUtils.ClusterMyId(oldPrimaryIndex, context.logger);

Expand All @@ -944,12 +947,12 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp
context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger);

// Make this replica of no-one
_ = context.clusterTestUtils.ReplicaOf(1, logger: context.logger);
_ = context.clusterTestUtils.ReplicaOf(newPrimaryIndex, logger: context.logger);

// Populate primary to diverge from replica 1 history
// Use temporary dictionary to populate values lost to replica 1
Dictionary<string, int> kvPairs2 = new();
Dictionary<string, List<int>> kvPairsObj2 = new Dictionary<string, List<int>>();
Dictionary<string, int> kvPairs2 = [];
Dictionary<string, List<int>> kvPairsObj2 = [];
if (disableObjects)
{
if (!performRMW) context.PopulatePrimary(ref kvPairs2, keyLength, kvpairCount, primaryIndex: oldPrimaryIndex);
Expand All @@ -976,13 +979,13 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp
context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger);

// Dispose primary
context.nodes[0].Dispose(false);
context.nodes[0] = null;
context.nodes[oldPrimaryIndex].Dispose(false);
context.nodes[oldPrimaryIndex] = null;

// Re-assign slots to replica manually since failover option was not
_ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, new List<(int, int)>() { (0, 16383) }, addslot: false, context.logger);
_ = context.clusterTestUtils.AddDelSlotsRange(replicaIndex, new List<(int, int)>() { (0, 16383) }, addslot: false, context.logger);
_ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, new List<(int, int)>() { (0, 16383) }, addslot: true, context.logger);
_ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, [(0, 16383)], addslot: false, context.logger);
_ = context.clusterTestUtils.AddDelSlotsRange(replicaIndex, [(0, 16383)], addslot: false, context.logger);
_ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, [(0, 16383)], addslot: true, context.logger);
context.clusterTestUtils.BumpEpoch(newPrimaryIndex, logger: context.logger);

// New primary diverges to its own history by new random seed
Expand Down
8 changes: 6 additions & 2 deletions test/Garnet.test.cluster/ClusterTestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public void CreateInstances(
AadAuthenticationSettings authenticationSettings = null,
bool disablePubSub = true,
int metricsSamplingFrequency = 0,
bool enableLua = false)
bool enableLua = false,
bool asyncReplay = false)
{
endpoints = TestUtils.GetEndPoints(shards, 7000);
nodes = TestUtils.CreateGarnetCluster(
Expand Down Expand Up @@ -151,7 +152,8 @@ public void CreateInstances(
certificates: certificates,
authenticationSettings: authenticationSettings,
metricsSamplingFrequency: metricsSamplingFrequency,
enableLua: enableLua);
enableLua: enableLua,
asyncReplay: asyncReplay);

foreach (var node in nodes)
node.Start();
Expand Down Expand Up @@ -204,6 +206,7 @@ public GarnetServer CreateInstance(
int gossipDelay = 5,
bool useTLS = false,
bool useAcl = false,
bool asyncReplay = false,
X509CertificateCollection certificates = null,
ServerCredential clusterCreds = new ServerCredential())
{
Expand Down Expand Up @@ -232,6 +235,7 @@ public GarnetServer CreateInstance(
EnableIncrementalSnapshots: EnableIncrementalSnapshots,
FastCommit: FastCommit,
useAcl: useAcl,
asyncReplay: asyncReplay,
aclFile: credManager.aclFilePath,
authUsername: clusterCreds.user,
authPassword: clusterCreds.password,
Expand Down
Loading

0 comments on commit 7baae01

Please sign in to comment.