Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align Replica Checkpoint Correctly on Attach #571

Merged
merged 10 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 32) -- NOTE - these two values need to be the same
# 3) update the version in GarnetServer.csproj (~line 8)
######################################
name: 1.0.38
name: 1.0.39
trigger:
branches:
include:
Expand Down
20 changes: 10 additions & 10 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,20 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover

if (storeType is StoreType.Main or StoreType.All)
{
entry.storeVersion = storeWrapper.store.CurrentVersion;
entry.storeHlogToken = storeCheckpointToken;
entry.storeIndexToken = storeCheckpointToken;
entry.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.storePrimaryReplId = replicationManager.PrimaryReplId;
entry.metadata.storeVersion = storeWrapper.store.CurrentVersion;
entry.metadata.storeHlogToken = storeCheckpointToken;
entry.metadata.storeIndexToken = storeCheckpointToken;
entry.metadata.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.storePrimaryReplId = replicationManager.PrimaryReplId;
}

if (storeType is StoreType.Object or StoreType.All)
{
entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.objectStorePrimaryReplId = replicationManager.PrimaryReplId;
entry.metadata.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.metadata.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.objectStorePrimaryReplId = replicationManager.PrimaryReplId;
}

// Keep track of checkpoints for replica
Expand Down
102 changes: 42 additions & 60 deletions libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,24 @@
using System.IO;
using System.Text;
using Garnet.common;
using Garnet.server;

namespace Garnet.cluster
{
sealed class CheckpointEntry
{
public long storeVersion;
public Guid storeHlogToken;
public Guid storeIndexToken;
public long storeCheckpointCoveredAofAddress;
public string storePrimaryReplId;

public long objectStoreVersion;
public Guid objectStoreHlogToken;
public Guid objectStoreIndexToken;
public long objectCheckpointCoveredAofAddress;
public string objectStorePrimaryReplId;

public CheckpointMetadata metadata;
public SingleWriterMultiReaderLock _lock;
public CheckpointEntry next;

public CheckpointEntry()
{
storeVersion = -1;
storeHlogToken = default;
storeIndexToken = default;
storeCheckpointCoveredAofAddress = long.MaxValue;
storePrimaryReplId = null;

objectStoreVersion = -1;
objectStoreHlogToken = default;
objectStoreIndexToken = default;
objectCheckpointCoveredAofAddress = long.MaxValue;
objectStorePrimaryReplId = null;

metadata = new();
next = null;
}

public long GetMinAofCoveredAddress()
=> Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64);
=> Math.Max(Math.Min(metadata.storeCheckpointCoveredAofAddress, metadata.objectCheckpointCoveredAofAddress), 64);

/// <summary>
/// Indicate addition of new reader by trying to increment reader counter
Expand Down Expand Up @@ -76,26 +55,26 @@ public bool ContainsSharedToken(CheckpointEntry entry, CheckpointFileType fileTy
{
return fileType switch
{
CheckpointFileType.STORE_HLOG => storeHlogToken.Equals(entry.storeHlogToken),
CheckpointFileType.STORE_INDEX => storeIndexToken.Equals(entry.storeIndexToken),
CheckpointFileType.OBJ_STORE_HLOG => objectStoreHlogToken.Equals(entry.objectStoreHlogToken),
CheckpointFileType.OBJ_STORE_INDEX => objectStoreIndexToken.Equals(entry.objectStoreIndexToken),
CheckpointFileType.STORE_HLOG => metadata.storeHlogToken.Equals(entry.metadata.storeHlogToken),
CheckpointFileType.STORE_INDEX => metadata.storeIndexToken.Equals(entry.metadata.storeIndexToken),
CheckpointFileType.OBJ_STORE_HLOG => metadata.objectStoreHlogToken.Equals(entry.metadata.objectStoreHlogToken),
CheckpointFileType.OBJ_STORE_INDEX => metadata.objectStoreIndexToken.Equals(entry.metadata.objectStoreIndexToken),
_ => throw new Exception($"Option {fileType} not supported")
};
}

public string GetCheckpointEntryDump()
{
string dump = $"\n" +
$"storeVersion: {storeVersion}\n" +
$"storeHlogToken: {storeHlogToken}\n" +
$"storeIndexToken: {storeIndexToken}\n" +
$"storeCheckpointCoveredAofAddress: {storeCheckpointCoveredAofAddress}\n" +
$"storeVersion: {metadata.storeVersion}\n" +
$"storeHlogToken: {metadata.storeHlogToken}\n" +
$"storeIndexToken: {metadata.storeIndexToken}\n" +
$"storeCheckpointCoveredAofAddress: {metadata.storeCheckpointCoveredAofAddress}\n" +
$"------------------------------------------------------------------------\n" +
$"objectStoreVersion:{objectStoreVersion}\n" +
$"objectStoreHlogToken:{objectStoreHlogToken}\n" +
$"objectStoreIndexToken:{objectStoreIndexToken}\n" +
$"objectCheckpointCoveredAofAddress:{objectCheckpointCoveredAofAddress}\n" +
$"objectStoreVersion:{metadata.objectStoreVersion}\n" +
$"objectStoreHlogToken:{metadata.objectStoreHlogToken}\n" +
$"objectStoreIndexToken:{metadata.objectStoreIndexToken}\n" +
$"objectCheckpointCoveredAofAddress:{metadata.objectCheckpointCoveredAofAddress}\n" +
$"------------------------------------------------------------------------\n" +
$"activeReaders:{_lock}";
return dump;
Expand All @@ -108,28 +87,28 @@ public byte[] ToByteArray()
byte[] byteBuffer = default;

//Write checkpoint entry data for main store
writer.Write(storeVersion);
byteBuffer = storeHlogToken.ToByteArray();
writer.Write(metadata.storeVersion);
byteBuffer = metadata.storeHlogToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
byteBuffer = storeIndexToken.ToByteArray();
byteBuffer = metadata.storeIndexToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
writer.Write(storeCheckpointCoveredAofAddress);
writer.Write(storePrimaryReplId == null ? 0 : 1);
if (storePrimaryReplId != null) writer.Write(storePrimaryReplId);
writer.Write(metadata.storeCheckpointCoveredAofAddress);
writer.Write(metadata.storePrimaryReplId == null ? 0 : 1);
if (metadata.storePrimaryReplId != null) writer.Write(metadata.storePrimaryReplId);

//Write checkpoint entry data for object store
writer.Write(objectStoreVersion);
byteBuffer = objectStoreHlogToken.ToByteArray();
writer.Write(metadata.objectStoreVersion);
byteBuffer = metadata.objectStoreHlogToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
byteBuffer = objectStoreIndexToken.ToByteArray();
byteBuffer = metadata.objectStoreIndexToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
writer.Write(objectCheckpointCoveredAofAddress);
writer.Write(objectStorePrimaryReplId == null ? 0 : 1);
if (objectStorePrimaryReplId != null) writer.Write(objectStorePrimaryReplId);
writer.Write(metadata.objectCheckpointCoveredAofAddress);
writer.Write(metadata.objectStorePrimaryReplId == null ? 0 : 1);
if (metadata.objectStorePrimaryReplId != null) writer.Write(metadata.objectStorePrimaryReplId);

byte[] byteArray = ms.ToArray();
writer.Dispose();
Expand All @@ -143,17 +122,20 @@ public static CheckpointEntry FromByteArray(byte[] serialized)
var reader = new BinaryReader(ms);
var cEntry = new CheckpointEntry
{
storeVersion = reader.ReadInt64(),
storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeCheckpointCoveredAofAddress = reader.ReadInt64(),
storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default,

objectStoreVersion = reader.ReadInt64(),
objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectCheckpointCoveredAofAddress = reader.ReadInt64(),
objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default
metadata = new()
{
storeVersion = reader.ReadInt64(),
storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeCheckpointCoveredAofAddress = reader.ReadInt64(),
storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default,

objectStoreVersion = reader.ReadInt64(),
objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectCheckpointCoveredAofAddress = reader.ReadInt64(),
objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default
}
};

reader.Dispose();
Expand Down
Loading