diff --git a/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs b/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs new file mode 100644 index 0000000000..76560ac728 --- /dev/null +++ b/test/Garnet.test.cluster/ClusterReplicationAsyncReplayTests.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using NUnit.Framework; + +namespace Garnet.test.cluster +{ + [TestFixture, NonParallelizable] + public unsafe class ClusterReplicationAsyncReplayTests + { + ClusterReplicationTests tests; + + [SetUp] + public void Setup() + { + tests = new ClusterReplicationTests(UseTLS: false, asyncReplay: true); + tests.Setup(); + } + + [TearDown] + public void TearDown() + { + tests.TearDown(); + tests = null; + } + + [Test, Order(1)] + [Category("REPLICATION")] + public void ClusterAsyncReplaySRTest([Values] bool disableObjects) => tests.ClusterSRTest(disableObjects); + + [Test, Order(2)] + [Category("REPLICATION")] + public void ClusterAsyncReplayCheckpointRestartSecondary([Values] bool performRMW, [Values] bool disableObjects) + => tests.ClusterSRNoCheckpointRestartSecondary(performRMW, disableObjects); + + [Test, Order(3)] + [Category("REPLICATION")] + public void ClusterAsyncReplayPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects) + => tests.ClusterSRPrimaryCheckpoint(performRMW, disableObjects); + + [Test, Order(4)] + [Category("REPLICATION")] + public void ClusterAsyncReplaySRPrimaryCheckpointRetrieve([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory, [Values] bool manySegments) + => tests.ClusterSRPrimaryCheckpointRetrieve(performRMW, disableObjects, lowMemory, manySegments); + + [Test, Order(5)] + [Category("REPLICATION")] + public void ClusterAsyncReplayCheckpointRetrieveDisableStorageTier([Values] bool performRMW, [Values] bool disableObjects) + => tests.ClusterCheckpointRetrieveDisableStorageTier(performRMW, disableObjects); + + [Test, Order(6)] + [Category("REPLICATION")] + public void ClusterAsyncReplayReplicaAfterPrimaryCheckpoint([Values] bool performRMW, [Values] bool disableObjects, [Values] bool lowMemory) + => tests.ClusterSRAddReplicaAfterPrimaryCheckpoint(performRMW, disableObjects, lowMemory); + + [Test, Order(7)] + [Category("REPLICATION")] + public void ClusterAsyncReplayPrimaryRestart([Values] bool performRMW, [Values] bool disableObjects) + => tests.ClusterSRPrimaryRestart(performRMW, disableObjects); + + [Test, Order(8)] + [Category("REPLICATION")] + public void ClusterAsyncReplayRedirectWrites() + => tests.ClusterSRRedirectWrites(); + + [Test, Order(9)] + [Category("REPLICATION")] + public void ClusterAsyncReplayRedirectWrites([Values] bool performRMW) + => tests.ClusterSRReplicaOfTest(performRMW); + + [Test, Order(10)] + [Category("REPLICATION")] + public void ClusterAsyncReplayReplicationSimpleFailover([Values] bool performRMW, [Values] bool checkpoint) + => tests.ClusterReplicationSimpleFailover(performRMW, checkpoint); + + [Test, Order(11)] + [Category("REPLICATION")] + public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] bool takePrimaryCheckpoint, [Values] bool takeNewPrimaryCheckpoint, [Values] bool enableIncrementalSnapshots) + => tests.ClusterFailoverAttachReplicas(performRMW, takePrimaryCheckpoint, takeNewPrimaryCheckpoint, enableIncrementalSnapshots); + + [Test, Order(12)] + [Category("REPLICATION")] + public void ClusterAsyncReplayDivergentCheckpointTest([Values] bool performRMW, [Values] bool disableObjects) + => tests.ClusterDivergentCheckpointTest(performRMW, disableObjects); + + [Test, Order(13)] + [Category("REPLICATION")] + public void ClusterAsyncReplayDivergentReplicasMMTest([Values] bool performRMW, [Values] bool disableObjects, [Values] bool ckptBeforeDivergence) + => tests.ClusterDivergentReplicasMMTest(performRMW, disableObjects, ckptBeforeDivergence); + + [Test, Order(14)] + [Category("REPLICATION")] + public void ClusterAsyncReplayDivergentCheckpointMMTest([Values] bool performRMW, [Values] bool disableObjects) + => tests.ClusterDivergentCheckpointMMTest(performRMW, disableObjects); + + [Test, Order(15)] + [Category("REPLICATION")] + public void ClusterAsyncReplayDivergentCheckpointMMFastCommitTest([Values] bool disableObjects, [Values] bool mainMemoryReplication) + => tests.ClusterDivergentCheckpointMMFastCommitTest(disableObjects, mainMemoryReplication); + } +} \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index fcce7aee18..ab98725973 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -17,7 +17,7 @@ namespace Garnet.test.cluster { [TestFixture(false), NonParallelizable] - public class ClusterReplicationTests(bool UseTLS = false) + public class ClusterReplicationTests(bool UseTLS = false, bool asyncReplay = false) { public (Action, string)[] GetUnitTests() { @@ -150,7 +150,7 @@ public void ClusterSRNoCheckpointRestartSecondary([Values] bool performRMW, [Val var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -216,7 +216,7 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -272,7 +272,8 @@ public void ClusterSRPrimaryCheckpoint([Values] bool performRMW, [Values] bool d enableAOF: true, timeout: timeout, useTLS: useTLS, - cleanClusterConfig: false); + cleanClusterConfig: false, + asyncReplay: asyncReplay); context.nodes[1].Start(); context.CreateConnection(useTLS: useTLS); @@ -314,7 +315,7 @@ void ClusterSRPrimaryCheckpointRetrieve(bool performRMW, bool disableObjects, bo var primary_count = 1; var nodes_count = primary_count + primary_count * replica_count; ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, disableObjects: disableObjects, lowMemory: lowMemory, SegmentSize: manySegments ? "4k" : "1g", DisableStorageTier: disableStorageTier, EnableIncrementalSnapshots: incrementalSnapshots, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, disableObjects: disableObjects, lowMemory: lowMemory, SegmentSize: manySegments ? "4k" : "1g", DisableStorageTier: disableStorageTier, EnableIncrementalSnapshots: incrementalSnapshots, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -370,7 +371,8 @@ void ClusterSRPrimaryCheckpointRetrieve(bool performRMW, bool disableObjects, bo cleanClusterConfig: false, lowMemory: lowMemory, SegmentSize: manySegments ? "4k" : "1g", - DisableStorageTier: disableStorageTier); + DisableStorageTier: disableStorageTier, + asyncReplay: asyncReplay); context.nodes[replicaIndex].Start(); context.CreateConnection(useTLS: useTLS); @@ -389,7 +391,7 @@ public void ClusterSRAddReplicaAfterPrimaryCheckpoint([Values] bool performRMW, var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, lowMemory: lowMemory, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, lowMemory: lowMemory, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger)); @@ -448,7 +450,7 @@ public void ClusterSRPrimaryRestart([Values] bool performRMW, [Values] bool disa var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger)); @@ -492,7 +494,8 @@ public void ClusterSRPrimaryRestart([Values] bool performRMW, [Values] bool disa enableAOF: true, timeout: timeout, useTLS: useTLS, - cleanClusterConfig: false); + cleanClusterConfig: false, + asyncReplay: asyncReplay); context.nodes[0].Start(); context.CreateConnection(useTLS: useTLS); @@ -514,7 +517,7 @@ public void ClusterSRRedirectWrites() var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -539,10 +542,10 @@ public void ClusterSRRedirectWrites() public void ClusterSRReplicaOfTest([Values] bool performRMW) { var nodes_count = 2; - context.CreateInstances(nodes_count, tryRecover: true, disableObjects: true, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, tryRecover: true, disableObjects: true, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); - ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger)); + ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], true, context.logger)); context.clusterTestUtils.SetConfigEpoch(0, 1, context.logger); context.clusterTestUtils.SetConfigEpoch(1, 2, context.logger); @@ -578,7 +581,7 @@ public void ClusterReplicationSimpleFailover([Values] bool performRMW, [Values] var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, disableObjects: true, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -648,7 +651,7 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, disableObjects: true, EnableIncrementalSnapshots: enableIncrementalSnapshots, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, disableObjects: true, EnableIncrementalSnapshots: enableIncrementalSnapshots, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -905,7 +908,7 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, disableObjects: disableObjects, MainMemoryReplication: mainMemoryReplication, CommitFrequencyMs: mainMemoryReplication ? -1 : 0, OnDemandCheckpoint: mainMemoryReplication, FastCommit: fastCommit, enableAOF: true, useTLS: useTLS); + context.CreateInstances(nodes_count, disableObjects: disableObjects, MainMemoryReplication: mainMemoryReplication, CommitFrequencyMs: mainMemoryReplication ? -1 : 0, OnDemandCheckpoint: mainMemoryReplication, FastCommit: fastCommit, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); context.CreateConnection(useTLS: useTLS); _ = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -916,8 +919,8 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp var keyLength = 8; var kvpairCount = 16; var addCount = 5; - context.kvPairs = new(); - context.kvPairsObj = new Dictionary>(); + context.kvPairs = []; + context.kvPairsObj = []; _ = context.clusterTestUtils.ClusterMyId(oldPrimaryIndex, context.logger); @@ -944,12 +947,12 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger); // Make this replica of no-one - _ = context.clusterTestUtils.ReplicaOf(1, logger: context.logger); + _ = context.clusterTestUtils.ReplicaOf(newPrimaryIndex, logger: context.logger); // Populate primary to diverge from replica 1 history // Use temporary dictionary to populate values lost to replica 1 - Dictionary kvPairs2 = new(); - Dictionary> kvPairsObj2 = new Dictionary>(); + Dictionary kvPairs2 = []; + Dictionary> kvPairsObj2 = []; if (disableObjects) { if (!performRMW) context.PopulatePrimary(ref kvPairs2, keyLength, kvpairCount, primaryIndex: oldPrimaryIndex); @@ -976,13 +979,13 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp context.clusterTestUtils.WaitForReplicaAofSync(oldPrimaryIndex, replicaIndex, context.logger); // Dispose primary - context.nodes[0].Dispose(false); - context.nodes[0] = null; + context.nodes[oldPrimaryIndex].Dispose(false); + context.nodes[oldPrimaryIndex] = null; // Re-assign slots to replica manually since failover option was not - _ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, new List<(int, int)>() { (0, 16383) }, addslot: false, context.logger); - _ = context.clusterTestUtils.AddDelSlotsRange(replicaIndex, new List<(int, int)>() { (0, 16383) }, addslot: false, context.logger); - _ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, new List<(int, int)>() { (0, 16383) }, addslot: true, context.logger); + _ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, [(0, 16383)], addslot: false, context.logger); + _ = context.clusterTestUtils.AddDelSlotsRange(replicaIndex, [(0, 16383)], addslot: false, context.logger); + _ = context.clusterTestUtils.AddDelSlotsRange(newPrimaryIndex, [(0, 16383)], addslot: true, context.logger); context.clusterTestUtils.BumpEpoch(newPrimaryIndex, logger: context.logger); // New primary diverges to its own history by new random seed diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index abc7ed60f6..f738e7f78d 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -119,7 +119,8 @@ public void CreateInstances( AadAuthenticationSettings authenticationSettings = null, bool disablePubSub = true, int metricsSamplingFrequency = 0, - bool enableLua = false) + bool enableLua = false, + bool asyncReplay = false) { endpoints = TestUtils.GetEndPoints(shards, 7000); nodes = TestUtils.CreateGarnetCluster( @@ -151,7 +152,8 @@ public void CreateInstances( certificates: certificates, authenticationSettings: authenticationSettings, metricsSamplingFrequency: metricsSamplingFrequency, - enableLua: enableLua); + enableLua: enableLua, + asyncReplay: asyncReplay); foreach (var node in nodes) node.Start(); @@ -204,6 +206,7 @@ public GarnetServer CreateInstance( int gossipDelay = 5, bool useTLS = false, bool useAcl = false, + bool asyncReplay = false, X509CertificateCollection certificates = null, ServerCredential clusterCreds = new ServerCredential()) { @@ -232,6 +235,7 @@ public GarnetServer CreateInstance( EnableIncrementalSnapshots: EnableIncrementalSnapshots, FastCommit: FastCommit, useAcl: useAcl, + asyncReplay: asyncReplay, aclFile: credManager.aclFilePath, authUsername: clusterCreds.user, authPassword: clusterCreds.password, diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index 7d2f848d7f..329bbd97fb 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -363,7 +363,8 @@ public static GarnetServer[] CreateGarnetCluster( ILoggerFactory loggerFactory = null, AadAuthenticationSettings authenticationSettings = null, int metricsSamplingFrequency = 0, - bool enableLua = false) + bool enableLua = false, + bool asyncReplay = false) { if (UseAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -404,7 +405,8 @@ public static GarnetServer[] CreateGarnetCluster( logger: loggerFactory?.CreateLogger("GarnetServer"), aadAuthenticationSettings: authenticationSettings, metricsSamplingFrequency: metricsSamplingFrequency, - enableLua: enableLua); + enableLua: enableLua, + asyncReplay: asyncReplay); ClassicAssert.IsNotNull(opts); int iter = 0; @@ -451,6 +453,7 @@ public static GarnetServerOptions GetGarnetServerOptions( AadAuthenticationSettings aadAuthenticationSettings = null, int metricsSamplingFrequency = 0, bool enableLua = false, + bool asyncReplay = false, ILogger logger = null) { if (UseAzureStorage) @@ -536,6 +539,7 @@ public static GarnetServerOptions GetGarnetServerOptions( ClusterUsername = authUsername, ClusterPassword = authPassword, EnableLua = enableLua, + ReplicationOffsetMaxLag = asyncReplay ? 1 << 20 : -1, }; if (lowMemory)