Skip to content

Commit

Permalink
Safe tail refresh for scanning uncommitted TsavoriteLog (#687)
Browse files Browse the repository at this point in the history
* Periodic safe tail refresh for scanning uncommitted TsavoriteLog

* nit

* fix

* use SWARE

* Improve logic and clean up names, add config for pub-sub

* Fix tsavorite

* fixes

* fix ScanUncommittedTest

* SafeTail refresh is needed only if replication is enabled

* added comments

* nit
  • Loading branch information
badrishc authored Sep 25, 2024
1 parent ec5a46a commit 63fdc08
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 100 deletions.
9 changes: 9 additions & 0 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ internal sealed class Options
[Option("aof-size-limit", Required = false, HelpText = "Maximum size of AOF (rounds down to power of 2) after which unsafe truncation will be applied. Left empty AOF will grow without bound unless a checkpoint is taken")]
public string AofSizeLimit { get; set; }

[IntRangeValidation(0, int.MaxValue)]
[Option("aof-refresh-freq", Required = false, HelpText = "AOF replication (safe tail address) refresh frequency in milliseconds. 0 = auto refresh after every enqueue.")]
public int AofReplicationRefreshFrequencyMs { get; set; }

[IntRangeValidation(0, int.MaxValue)]
[Option("subscriber-refresh-freq", Required = false, HelpText = "Subscriber (safe tail address) refresh frequency in milliseconds (for pub-sub). 0 = auto refresh after every enqueue.")]
public int SubscriberRefreshFrequencyMs { get; set; }

[IntRangeValidation(0, int.MaxValue)]
[Option("compaction-freq", Required = false, HelpText = "Background hybrid log compaction frequency in seconds. 0 = disabled (compaction performed before checkpointing instead)")]
public int CompactionFrequencySecs { get; set; }
Expand Down Expand Up @@ -588,6 +596,7 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
LuaTransactionMode = LuaTransactionMode.GetValueOrDefault(),
AofMemorySize = AofMemorySize,
AofPageSize = AofPageSize,
AofReplicationRefreshFrequencyMs = AofReplicationRefreshFrequencyMs,
CommitFrequencyMs = CommitFrequencyMs,
WaitForCommit = WaitForCommit.GetValueOrDefault(),
AofSizeLimit = AofSizeLimit,
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private void InitializeServer()
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);

if (!opts.DisablePubSub)
subscribeBroker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), true);
subscribeBroker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), opts.SubscriberRefreshFrequencyMs, true);

CreateAOF();

Expand Down
6 changes: 6 additions & 0 deletions libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@
/* Size of each AOF page in bytes(rounds down to power of 2) */
"AofPageSize" : "4m",

/* AOF replication (safe tail address) refresh frequency in milliseconds. 0 = auto refresh after every enqueue. */
"AofReplicationRefreshFrequencyMs": 10,

/* Subscriber (safe tail address) refresh frequency in milliseconds (for pub-sub). 0 = auto refresh after every enqueue. */
"SubscriberRefreshFrequencyMs": 0,

/* Write ahead logging (append-only file) commit issue frequency in milliseconds. 0 = issue an immediate commit per operation, -1 = manually issue commits using COMMITAOF command */
"CommitFrequencyMs" : 0,

Expand Down
5 changes: 3 additions & 2 deletions libs/server/PubSub/SubscribeBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ public sealed class SubscribeBroker<TKey, TValue, TKeyValueSerializer> : IDispos
/// <param name="keySerializer">Serializer for Prefix Match and serializing Key</param>
/// <param name="logDir">Directory where the log will be stored</param>
/// <param name="pageSize">Page size of log used for pub/sub</param>
/// <param name="subscriberRefreshFrequencyMs">Subscriber log refresh frequency</param>
/// <param name="startFresh">start the log from scratch, do not continue</param>
public SubscribeBroker(IKeySerializer<TKey> keySerializer, string logDir, long pageSize, bool startFresh = true)
public SubscribeBroker(IKeySerializer<TKey> keySerializer, string logDir, long pageSize, int subscriberRefreshFrequencyMs, bool startFresh = true)
{
this.keySerializer = keySerializer;
device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
device.Initialize((long)(1 << 30) * 64);
log = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSize = pageSize, MemorySize = pageSize * 4, AutoRefreshSafeTailAddress = true });
log = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSize = pageSize, MemorySize = pageSize * 4, SafeTailRefreshFrequencyMs = subscriberRefreshFrequencyMs });
if (startFresh)
log.TruncateUntil(log.CommittedUntilAddress);
}
Expand Down
12 changes: 11 additions & 1 deletion libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public string AofPageSize = "4m";

/// <summary>
/// AOF replication (safe tail address) refresh frequency in milliseconds. 0 = auto refresh after every enqueue.
/// </summary>
public int AofReplicationRefreshFrequencyMs = 10;

/// <summary>
/// Subscriber (safe tail address) refresh frequency in milliseconds (for pub-sub). 0 = auto refresh after every enqueue.
/// </summary>
public int SubscriberRefreshFrequencyMs = 0;

/// <summary>
/// Write ahead logging (append-only file) commit issue frequency in milliseconds.
/// 0 = issue an immediate commit per operation
Expand Down Expand Up @@ -616,7 +626,7 @@ public void GetAofSettings(out TsavoriteLogSettings tsavoriteLogSettings)
PageSizeBits = AofPageSizeBits(),
LogDevice = GetAofDevice(),
TryRecoverLatest = false,
AutoRefreshSafeTailAddress = true,
SafeTailRefreshFrequencyMs = EnableCluster ? AofReplicationRefreshFrequencyMs : -1,
FastCommitMode = EnableFastCommit,
AutoCommit = CommitFrequencyMs == 0,
MutableFraction = 0.9,
Expand Down
Loading

0 comments on commit 63fdc08

Please sign in to comment.