Skip to content

Commit

Permalink
remap ReplicationOffsetMaxLag parameter to make infinite lag default
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Nov 14, 2024
1 parent 4ea6b48 commit 4b667cb
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using Garnet.common;
using Microsoft.Extensions.Logging;
Expand All @@ -10,9 +11,10 @@ namespace Garnet.cluster
{
internal sealed partial class ReplicationManager : IDisposable
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void ThrottlePrimary()
{
while (replayIterator != null && storeWrapper.appendOnlyFile.TailAddress - ReplicationOffset > storeWrapper.serverOptions.ReplicationOffsetMaxLag)
while (storeWrapper.serverOptions.ReplicationOffsetMaxLag != -1 && replayIterator != null && storeWrapper.appendOnlyFile.TailAddress - ReplicationOffset > storeWrapper.serverOptions.ReplicationOffsetMaxLag)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();
Thread.Yield();
Expand Down Expand Up @@ -63,8 +65,8 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
}
}

// Address check
if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == -1 && ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
// Address check only if synchronous replication is enabled
if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == 0 && ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
{
logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress);
logger?.LogError("Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
Expand All @@ -74,7 +76,12 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
// Enqueue to AOF
_ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span<byte>(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit);

if (storeWrapper.serverOptions.ReplicationOffsetMaxLag > -1)
if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == 0)
{
// Synchronous replay
Consume(record, recordLength, currentAddress, nextAddress, isProtected: false);
}
else
{
// Throttle to give the opportunity to the background replay task to catch up
ThrottlePrimary();
Expand All @@ -93,11 +100,6 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
System.Threading.Tasks.Task.Run(ReplicaReplayTask);
}
}
else
{
// Synchronous replay
Consume(record, recordLength, currentAddress, nextAddress, isProtected: false);
}
}
catch (Exception ex)
{
Expand Down
2 changes: 1 addition & 1 deletion libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ internal sealed class Options
public int ReplicaSyncDelayMs { get; set; }

[IntRangeValidation(-1, int.MaxValue)]
[Option("replica-offset-max-lag", Required = false, HelpText = "Upper bound on the lag (i.e. throttle replicaAOF append if AOF.TailAddress - ReplicationOffset > ReplicaMaxLag) between primary and replica. -1 - Synchronous replay, >= 0 - background replay with specified lag")]
[Option("replica-offset-max-lag", Required = false, HelpText = "Throttle ClusterAppendLog when replica.AOFTailAddress - ReplicationOffset > ReplicationOffsetMaxLag. 0: Synchronous replay, >=1: background replay with specified lag, -1: infinite lag")]
public int ReplicationOffsetMaxLag { get; set; }

[OptionValidation]
Expand Down
2 changes: 1 addition & 1 deletion libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@
/* Whether and by how much (milliseconds) should we throttle the replica sync: 0 - disable throttling */
"ReplicaSyncDelayMs" : 5,

/* Upper bound on the lag (i.e. throttle replicaAOF append if AOF.TailAddress - ReplicationOffset > ReplicaMaxLag) between primary and replica. -1 - Synchronous replay, >= 0 - background replay with specified lag*/
/* Throttle ClusterAppendLog when replica.AOFTailAddress - ReplicationOffset > ReplicationOffsetMaxLag. 0: Synchronous replay, >=1: background replay with specified lag, -1: infinite lag */
"ReplicationOffsetMaxLag" : -1,

/* Use main-memory replication model. */
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public class GarnetServerOptions : ServerOptions
public int ReplicaSyncDelayMs = 5;

/// <summary>
/// Upper bound on the lag (i.e. throttle replicaAOF append if AOF.TailAddress - ReplicationOffset > ReplicaMaxLag) between primary and replica. -1 - Synchronous replay, >= 0 - background replay with specified lag
/// Throttle ClusterAppendLog when replica.AOFTailAddress - ReplicationOffset > ReplicationOffsetMaxLag. 0: Synchronous replay, >=1: background replay with specified lag, -1: infinite lag
/// </summary>
public int ReplicationOffsetMaxLag = -1;

Expand Down
8 changes: 5 additions & 3 deletions test/Garnet.test/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ public static GarnetServer CreateGarnetServer(
bool enableLua = false,
ILogger logger = null,
IEnumerable<string> loadModulePaths = null,
string pubSubPageSize = null)
string pubSubPageSize = null,
bool asyncReplay = false)
{
if (UseAzureStorage)
IgnoreIfNotRunningAzureTests();
Expand Down Expand Up @@ -278,7 +279,8 @@ public static GarnetServer CreateGarnetServer(
EnableScatterGatherGet = getSG,
IndexResizeFrequencySecs = indexResizeFrequencySecs,
ThreadPoolMinThreads = threadPoolMinThreads,
LoadModuleCS = loadModulePaths
LoadModuleCS = loadModulePaths,
ReplicationOffsetMaxLag = asyncReplay ? -1 : 0
};

if (!string.IsNullOrEmpty(pubSubPageSize))
Expand Down Expand Up @@ -539,7 +541,7 @@ public static GarnetServerOptions GetGarnetServerOptions(
ClusterUsername = authUsername,
ClusterPassword = authPassword,
EnableLua = enableLua,
ReplicationOffsetMaxLag = asyncReplay ? 1 << 20 : -1,
ReplicationOffsetMaxLag = asyncReplay ? -1 : 0
};

if (lowMemory)
Expand Down

0 comments on commit 4b667cb

Please sign in to comment.