Skip to content

Commit

Permalink
Align Replica Checkpoint Correctly on Attach (#571)
Browse files Browse the repository at this point in the history
* skip sending primary's checkpoint only if history and version match with that of the remote checkpoint

* reset system state and verion on store.Reset

* recover in-memory checkpoint store only when recover flag is set

* add more info in replica recovery log message

* separate checkpoint entry metadata

* wip

* add test to validate bug fix

* log message tags

* update nunit legacy methods

* relase-1.0.39
  • Loading branch information
vazois authored Nov 13, 2024
1 parent d3ef7f7 commit 94ee0a2
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 182 deletions.
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 32) -- NOTE - these two values need to be the same
# 3) update the version in GarnetServer.csproj (~line 8)
######################################
name: 1.0.38
name: 1.0.39
trigger:
branches:
include:
Expand Down
20 changes: 10 additions & 10 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,20 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover

if (storeType is StoreType.Main or StoreType.All)
{
entry.storeVersion = storeWrapper.store.CurrentVersion;
entry.storeHlogToken = storeCheckpointToken;
entry.storeIndexToken = storeCheckpointToken;
entry.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.storePrimaryReplId = replicationManager.PrimaryReplId;
entry.metadata.storeVersion = storeWrapper.store.CurrentVersion;
entry.metadata.storeHlogToken = storeCheckpointToken;
entry.metadata.storeIndexToken = storeCheckpointToken;
entry.metadata.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.storePrimaryReplId = replicationManager.PrimaryReplId;
}

if (storeType is StoreType.Object or StoreType.All)
{
entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.objectStorePrimaryReplId = replicationManager.PrimaryReplId;
entry.metadata.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.metadata.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.objectStorePrimaryReplId = replicationManager.PrimaryReplId;
}

// Keep track of checkpoints for replica
Expand Down
102 changes: 42 additions & 60 deletions libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,24 @@
using System.IO;
using System.Text;
using Garnet.common;
using Garnet.server;

namespace Garnet.cluster
{
sealed class CheckpointEntry
{
public long storeVersion;
public Guid storeHlogToken;
public Guid storeIndexToken;
public long storeCheckpointCoveredAofAddress;
public string storePrimaryReplId;

public long objectStoreVersion;
public Guid objectStoreHlogToken;
public Guid objectStoreIndexToken;
public long objectCheckpointCoveredAofAddress;
public string objectStorePrimaryReplId;

public CheckpointMetadata metadata;
public SingleWriterMultiReaderLock _lock;
public CheckpointEntry next;

public CheckpointEntry()
{
storeVersion = -1;
storeHlogToken = default;
storeIndexToken = default;
storeCheckpointCoveredAofAddress = long.MaxValue;
storePrimaryReplId = null;

objectStoreVersion = -1;
objectStoreHlogToken = default;
objectStoreIndexToken = default;
objectCheckpointCoveredAofAddress = long.MaxValue;
objectStorePrimaryReplId = null;

metadata = new();
next = null;
}

public long GetMinAofCoveredAddress()
=> Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64);
=> Math.Max(Math.Min(metadata.storeCheckpointCoveredAofAddress, metadata.objectCheckpointCoveredAofAddress), 64);

/// <summary>
/// Indicate addition of new reader by trying to increment reader counter
Expand Down Expand Up @@ -76,26 +55,26 @@ public bool ContainsSharedToken(CheckpointEntry entry, CheckpointFileType fileTy
{
return fileType switch
{
CheckpointFileType.STORE_HLOG => storeHlogToken.Equals(entry.storeHlogToken),
CheckpointFileType.STORE_INDEX => storeIndexToken.Equals(entry.storeIndexToken),
CheckpointFileType.OBJ_STORE_HLOG => objectStoreHlogToken.Equals(entry.objectStoreHlogToken),
CheckpointFileType.OBJ_STORE_INDEX => objectStoreIndexToken.Equals(entry.objectStoreIndexToken),
CheckpointFileType.STORE_HLOG => metadata.storeHlogToken.Equals(entry.metadata.storeHlogToken),
CheckpointFileType.STORE_INDEX => metadata.storeIndexToken.Equals(entry.metadata.storeIndexToken),
CheckpointFileType.OBJ_STORE_HLOG => metadata.objectStoreHlogToken.Equals(entry.metadata.objectStoreHlogToken),
CheckpointFileType.OBJ_STORE_INDEX => metadata.objectStoreIndexToken.Equals(entry.metadata.objectStoreIndexToken),
_ => throw new Exception($"Option {fileType} not supported")
};
}

public string GetCheckpointEntryDump()
{
string dump = $"\n" +
$"storeVersion: {storeVersion}\n" +
$"storeHlogToken: {storeHlogToken}\n" +
$"storeIndexToken: {storeIndexToken}\n" +
$"storeCheckpointCoveredAofAddress: {storeCheckpointCoveredAofAddress}\n" +
$"storeVersion: {metadata.storeVersion}\n" +
$"storeHlogToken: {metadata.storeHlogToken}\n" +
$"storeIndexToken: {metadata.storeIndexToken}\n" +
$"storeCheckpointCoveredAofAddress: {metadata.storeCheckpointCoveredAofAddress}\n" +
$"------------------------------------------------------------------------\n" +
$"objectStoreVersion:{objectStoreVersion}\n" +
$"objectStoreHlogToken:{objectStoreHlogToken}\n" +
$"objectStoreIndexToken:{objectStoreIndexToken}\n" +
$"objectCheckpointCoveredAofAddress:{objectCheckpointCoveredAofAddress}\n" +
$"objectStoreVersion:{metadata.objectStoreVersion}\n" +
$"objectStoreHlogToken:{metadata.objectStoreHlogToken}\n" +
$"objectStoreIndexToken:{metadata.objectStoreIndexToken}\n" +
$"objectCheckpointCoveredAofAddress:{metadata.objectCheckpointCoveredAofAddress}\n" +
$"------------------------------------------------------------------------\n" +
$"activeReaders:{_lock}";
return dump;
Expand All @@ -108,28 +87,28 @@ public byte[] ToByteArray()
byte[] byteBuffer = default;

//Write checkpoint entry data for main store
writer.Write(storeVersion);
byteBuffer = storeHlogToken.ToByteArray();
writer.Write(metadata.storeVersion);
byteBuffer = metadata.storeHlogToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
byteBuffer = storeIndexToken.ToByteArray();
byteBuffer = metadata.storeIndexToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
writer.Write(storeCheckpointCoveredAofAddress);
writer.Write(storePrimaryReplId == null ? 0 : 1);
if (storePrimaryReplId != null) writer.Write(storePrimaryReplId);
writer.Write(metadata.storeCheckpointCoveredAofAddress);
writer.Write(metadata.storePrimaryReplId == null ? 0 : 1);
if (metadata.storePrimaryReplId != null) writer.Write(metadata.storePrimaryReplId);

//Write checkpoint entry data for object store
writer.Write(objectStoreVersion);
byteBuffer = objectStoreHlogToken.ToByteArray();
writer.Write(metadata.objectStoreVersion);
byteBuffer = metadata.objectStoreHlogToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
byteBuffer = objectStoreIndexToken.ToByteArray();
byteBuffer = metadata.objectStoreIndexToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
writer.Write(objectCheckpointCoveredAofAddress);
writer.Write(objectStorePrimaryReplId == null ? 0 : 1);
if (objectStorePrimaryReplId != null) writer.Write(objectStorePrimaryReplId);
writer.Write(metadata.objectCheckpointCoveredAofAddress);
writer.Write(metadata.objectStorePrimaryReplId == null ? 0 : 1);
if (metadata.objectStorePrimaryReplId != null) writer.Write(metadata.objectStorePrimaryReplId);

byte[] byteArray = ms.ToArray();
writer.Dispose();
Expand All @@ -143,17 +122,20 @@ public static CheckpointEntry FromByteArray(byte[] serialized)
var reader = new BinaryReader(ms);
var cEntry = new CheckpointEntry
{
storeVersion = reader.ReadInt64(),
storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeCheckpointCoveredAofAddress = reader.ReadInt64(),
storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default,

objectStoreVersion = reader.ReadInt64(),
objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectCheckpointCoveredAofAddress = reader.ReadInt64(),
objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default
metadata = new()
{
storeVersion = reader.ReadInt64(),
storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeCheckpointCoveredAofAddress = reader.ReadInt64(),
storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default,

objectStoreVersion = reader.ReadInt64(),
objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectCheckpointCoveredAofAddress = reader.ReadInt64(),
objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default
}
};

reader.Dispose();
Expand Down
Loading

0 comments on commit 94ee0a2

Please sign in to comment.