From 4b667cbda0731571dd25f87c08e51d96ced1e864 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 14 Nov 2024 10:11:53 -0800 Subject: [PATCH] remap ReplicationOffsetMaxLag parameter to make infinite lag default --- .../ReplicaOps/ReplicationReplicaAofSync.cs | 20 ++++++++++--------- libs/host/Configuration/Options.cs | 2 +- libs/host/defaults.conf | 2 +- libs/server/Servers/GarnetServerOptions.cs | 2 +- test/Garnet.test/TestUtils.cs | 8 +++++--- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs index 5083dfc60b..a1d00ca4ef 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Runtime.CompilerServices; using System.Threading; using Garnet.common; using Microsoft.Extensions.Logging; @@ -10,9 +11,10 @@ namespace Garnet.cluster { internal sealed partial class ReplicationManager : IDisposable { + [MethodImpl(MethodImplOptions.AggressiveInlining)] void ThrottlePrimary() { - while (replayIterator != null && storeWrapper.appendOnlyFile.TailAddress - ReplicationOffset > storeWrapper.serverOptions.ReplicationOffsetMaxLag) + while (storeWrapper.serverOptions.ReplicationOffsetMaxLag != -1 && replayIterator != null && storeWrapper.appendOnlyFile.TailAddress - ReplicationOffset > storeWrapper.serverOptions.ReplicationOffsetMaxLag) { replicaReplayTaskCts.Token.ThrowIfCancellationRequested(); Thread.Yield(); @@ -63,8 +65,8 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre } } - // Address check - if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == -1 && ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress) + // Address check only if synchronous replication is enabled + if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == 0 && ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress) { logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress); logger?.LogError("Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress); @@ -74,7 +76,12 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre // Enqueue to AOF _ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit); - if (storeWrapper.serverOptions.ReplicationOffsetMaxLag > -1) + if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == 0) + { + // Synchronous replay + Consume(record, recordLength, currentAddress, nextAddress, isProtected: false); + } + else { // Throttle to give the opportunity to the background replay task to catch up ThrottlePrimary(); @@ -93,11 +100,6 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre System.Threading.Tasks.Task.Run(ReplicaReplayTask); } } - else - { - // Synchronous replay - Consume(record, recordLength, currentAddress, nextAddress, isProtected: false); - } } catch (Exception ex) { diff --git a/libs/host/Configuration/Options.cs b/libs/host/Configuration/Options.cs index 91a6d1e87b..6ab9f8c87f 100644 --- a/libs/host/Configuration/Options.cs +++ b/libs/host/Configuration/Options.cs @@ -343,7 +343,7 @@ internal sealed class Options public int ReplicaSyncDelayMs { get; set; } [IntRangeValidation(-1, int.MaxValue)] - [Option("replica-offset-max-lag", Required = false, HelpText = "Upper bound on the lag (i.e. throttle replicaAOF append if AOF.TailAddress - ReplicationOffset > ReplicaMaxLag) between primary and replica. -1 - Synchronous replay, >= 0 - background replay with specified lag")] + [Option("replica-offset-max-lag", Required = false, HelpText = "Throttle ClusterAppendLog when replica.AOFTailAddress - ReplicationOffset > ReplicationOffsetMaxLag. 0: Synchronous replay, >=1: background replay with specified lag, -1: infinite lag")] public int ReplicationOffsetMaxLag { get; set; } [OptionValidation] diff --git a/libs/host/defaults.conf b/libs/host/defaults.conf index 4209bb32ed..4d8c79e6dd 100644 --- a/libs/host/defaults.conf +++ b/libs/host/defaults.conf @@ -254,7 +254,7 @@ /* Whether and by how much (milliseconds) should we throttle the replica sync: 0 - disable throttling */ "ReplicaSyncDelayMs" : 5, - /* Upper bound on the lag (i.e. throttle replicaAOF append if AOF.TailAddress - ReplicationOffset > ReplicaMaxLag) between primary and replica. -1 - Synchronous replay, >= 0 - background replay with specified lag*/ + /* Throttle ClusterAppendLog when replica.AOFTailAddress - ReplicationOffset > ReplicationOffsetMaxLag. 0: Synchronous replay, >=1: background replay with specified lag, -1: infinite lag */ "ReplicationOffsetMaxLag" : -1, /* Use main-memory replication model. */ diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index f24939d42f..3051235966 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -285,7 +285,7 @@ public class GarnetServerOptions : ServerOptions public int ReplicaSyncDelayMs = 5; /// - /// Upper bound on the lag (i.e. throttle replicaAOF append if AOF.TailAddress - ReplicationOffset > ReplicaMaxLag) between primary and replica. -1 - Synchronous replay, >= 0 - background replay with specified lag + /// Throttle ClusterAppendLog when replica.AOFTailAddress - ReplicationOffset > ReplicationOffsetMaxLag. 0: Synchronous replay, >=1: background replay with specified lag, -1: infinite lag /// public int ReplicationOffsetMaxLag = -1; diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index 329bbd97fb..a8278438da 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -202,7 +202,8 @@ public static GarnetServer CreateGarnetServer( bool enableLua = false, ILogger logger = null, IEnumerable loadModulePaths = null, - string pubSubPageSize = null) + string pubSubPageSize = null, + bool asyncReplay = false) { if (UseAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -278,7 +279,8 @@ public static GarnetServer CreateGarnetServer( EnableScatterGatherGet = getSG, IndexResizeFrequencySecs = indexResizeFrequencySecs, ThreadPoolMinThreads = threadPoolMinThreads, - LoadModuleCS = loadModulePaths + LoadModuleCS = loadModulePaths, + ReplicationOffsetMaxLag = asyncReplay ? -1 : 0 }; if (!string.IsNullOrEmpty(pubSubPageSize)) @@ -539,7 +541,7 @@ public static GarnetServerOptions GetGarnetServerOptions( ClusterUsername = authUsername, ClusterPassword = authPassword, EnableLua = enableLua, - ReplicationOffsetMaxLag = asyncReplay ? 1 << 20 : -1, + ReplicationOffsetMaxLag = asyncReplay ? -1 : 0 }; if (lowMemory)