diff --git a/.azure/pipelines/azure-pipelines-external-release.yml b/.azure/pipelines/azure-pipelines-external-release.yml index e83f0a417d..aa6e61c905 100644 --- a/.azure/pipelines/azure-pipelines-external-release.yml +++ b/.azure/pipelines/azure-pipelines-external-release.yml @@ -4,7 +4,7 @@ # 2) update \libs\host\GarnetServer.cs readonly string version (~line 32) -- NOTE - these two values need to be the same # 3) update the version in GarnetServer.csproj (~line 8) ###################################### -name: 1.0.38 +name: 1.0.39 trigger: branches: include: diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 65781c672f..a864a6a611 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -153,20 +153,20 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover if (storeType is StoreType.Main or StoreType.All) { - entry.storeVersion = storeWrapper.store.CurrentVersion; - entry.storeHlogToken = storeCheckpointToken; - entry.storeIndexToken = storeCheckpointToken; - entry.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; - entry.storePrimaryReplId = replicationManager.PrimaryReplId; + entry.metadata.storeVersion = storeWrapper.store.CurrentVersion; + entry.metadata.storeHlogToken = storeCheckpointToken; + entry.metadata.storeIndexToken = storeCheckpointToken; + entry.metadata.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; + entry.metadata.storePrimaryReplId = replicationManager.PrimaryReplId; } if (storeType is StoreType.Object or StoreType.All) { - entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion; - entry.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; - entry.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; - entry.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; - entry.objectStorePrimaryReplId = replicationManager.PrimaryReplId; + entry.metadata.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion; + entry.metadata.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; + entry.metadata.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; + entry.metadata.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; + entry.metadata.objectStorePrimaryReplId = replicationManager.PrimaryReplId; } // Keep track of checkpoints for replica diff --git a/libs/cluster/Server/Replication/CheckpointEntry.cs b/libs/cluster/Server/Replication/CheckpointEntry.cs index a775f28444..b8a7a776ab 100644 --- a/libs/cluster/Server/Replication/CheckpointEntry.cs +++ b/libs/cluster/Server/Replication/CheckpointEntry.cs @@ -5,45 +5,24 @@ using System.IO; using System.Text; using Garnet.common; +using Garnet.server; namespace Garnet.cluster { sealed class CheckpointEntry { - public long storeVersion; - public Guid storeHlogToken; - public Guid storeIndexToken; - public long storeCheckpointCoveredAofAddress; - public string storePrimaryReplId; - - public long objectStoreVersion; - public Guid objectStoreHlogToken; - public Guid objectStoreIndexToken; - public long objectCheckpointCoveredAofAddress; - public string objectStorePrimaryReplId; - + public CheckpointMetadata metadata; public SingleWriterMultiReaderLock _lock; public CheckpointEntry next; public CheckpointEntry() { - storeVersion = -1; - storeHlogToken = default; - storeIndexToken = default; - storeCheckpointCoveredAofAddress = long.MaxValue; - storePrimaryReplId = null; - - objectStoreVersion = -1; - objectStoreHlogToken = default; - objectStoreIndexToken = default; - objectCheckpointCoveredAofAddress = long.MaxValue; - objectStorePrimaryReplId = null; - + metadata = new(); next = null; } public long GetMinAofCoveredAddress() - => Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64); + => Math.Max(Math.Min(metadata.storeCheckpointCoveredAofAddress, metadata.objectCheckpointCoveredAofAddress), 64); /// /// Indicate addition of new reader by trying to increment reader counter @@ -76,10 +55,10 @@ public bool ContainsSharedToken(CheckpointEntry entry, CheckpointFileType fileTy { return fileType switch { - CheckpointFileType.STORE_HLOG => storeHlogToken.Equals(entry.storeHlogToken), - CheckpointFileType.STORE_INDEX => storeIndexToken.Equals(entry.storeIndexToken), - CheckpointFileType.OBJ_STORE_HLOG => objectStoreHlogToken.Equals(entry.objectStoreHlogToken), - CheckpointFileType.OBJ_STORE_INDEX => objectStoreIndexToken.Equals(entry.objectStoreIndexToken), + CheckpointFileType.STORE_HLOG => metadata.storeHlogToken.Equals(entry.metadata.storeHlogToken), + CheckpointFileType.STORE_INDEX => metadata.storeIndexToken.Equals(entry.metadata.storeIndexToken), + CheckpointFileType.OBJ_STORE_HLOG => metadata.objectStoreHlogToken.Equals(entry.metadata.objectStoreHlogToken), + CheckpointFileType.OBJ_STORE_INDEX => metadata.objectStoreIndexToken.Equals(entry.metadata.objectStoreIndexToken), _ => throw new Exception($"Option {fileType} not supported") }; } @@ -87,15 +66,15 @@ public bool ContainsSharedToken(CheckpointEntry entry, CheckpointFileType fileTy public string GetCheckpointEntryDump() { string dump = $"\n" + - $"storeVersion: {storeVersion}\n" + - $"storeHlogToken: {storeHlogToken}\n" + - $"storeIndexToken: {storeIndexToken}\n" + - $"storeCheckpointCoveredAofAddress: {storeCheckpointCoveredAofAddress}\n" + + $"storeVersion: {metadata.storeVersion}\n" + + $"storeHlogToken: {metadata.storeHlogToken}\n" + + $"storeIndexToken: {metadata.storeIndexToken}\n" + + $"storeCheckpointCoveredAofAddress: {metadata.storeCheckpointCoveredAofAddress}\n" + $"------------------------------------------------------------------------\n" + - $"objectStoreVersion:{objectStoreVersion}\n" + - $"objectStoreHlogToken:{objectStoreHlogToken}\n" + - $"objectStoreIndexToken:{objectStoreIndexToken}\n" + - $"objectCheckpointCoveredAofAddress:{objectCheckpointCoveredAofAddress}\n" + + $"objectStoreVersion:{metadata.objectStoreVersion}\n" + + $"objectStoreHlogToken:{metadata.objectStoreHlogToken}\n" + + $"objectStoreIndexToken:{metadata.objectStoreIndexToken}\n" + + $"objectCheckpointCoveredAofAddress:{metadata.objectCheckpointCoveredAofAddress}\n" + $"------------------------------------------------------------------------\n" + $"activeReaders:{_lock}"; return dump; @@ -108,28 +87,28 @@ public byte[] ToByteArray() byte[] byteBuffer = default; //Write checkpoint entry data for main store - writer.Write(storeVersion); - byteBuffer = storeHlogToken.ToByteArray(); + writer.Write(metadata.storeVersion); + byteBuffer = metadata.storeHlogToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - byteBuffer = storeIndexToken.ToByteArray(); + byteBuffer = metadata.storeIndexToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - writer.Write(storeCheckpointCoveredAofAddress); - writer.Write(storePrimaryReplId == null ? 0 : 1); - if (storePrimaryReplId != null) writer.Write(storePrimaryReplId); + writer.Write(metadata.storeCheckpointCoveredAofAddress); + writer.Write(metadata.storePrimaryReplId == null ? 0 : 1); + if (metadata.storePrimaryReplId != null) writer.Write(metadata.storePrimaryReplId); //Write checkpoint entry data for object store - writer.Write(objectStoreVersion); - byteBuffer = objectStoreHlogToken.ToByteArray(); + writer.Write(metadata.objectStoreVersion); + byteBuffer = metadata.objectStoreHlogToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - byteBuffer = objectStoreIndexToken.ToByteArray(); + byteBuffer = metadata.objectStoreIndexToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - writer.Write(objectCheckpointCoveredAofAddress); - writer.Write(objectStorePrimaryReplId == null ? 0 : 1); - if (objectStorePrimaryReplId != null) writer.Write(objectStorePrimaryReplId); + writer.Write(metadata.objectCheckpointCoveredAofAddress); + writer.Write(metadata.objectStorePrimaryReplId == null ? 0 : 1); + if (metadata.objectStorePrimaryReplId != null) writer.Write(metadata.objectStorePrimaryReplId); byte[] byteArray = ms.ToArray(); writer.Dispose(); @@ -143,17 +122,20 @@ public static CheckpointEntry FromByteArray(byte[] serialized) var reader = new BinaryReader(ms); var cEntry = new CheckpointEntry { - storeVersion = reader.ReadInt64(), - storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - storeCheckpointCoveredAofAddress = reader.ReadInt64(), - storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default, - - objectStoreVersion = reader.ReadInt64(), - objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - objectCheckpointCoveredAofAddress = reader.ReadInt64(), - objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default + metadata = new() + { + storeVersion = reader.ReadInt64(), + storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + storeCheckpointCoveredAofAddress = reader.ReadInt64(), + storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default, + + objectStoreVersion = reader.ReadInt64(), + objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + objectCheckpointCoveredAofAddress = reader.ReadInt64(), + objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default + } }; reader.Dispose(); diff --git a/libs/cluster/Server/Replication/CheckpointStore.cs b/libs/cluster/Server/Replication/CheckpointStore.cs index 3740134341..52a78f7285 100644 --- a/libs/cluster/Server/Replication/CheckpointStore.cs +++ b/libs/cluster/Server/Replication/CheckpointStore.cs @@ -40,7 +40,7 @@ public void Initialize() { head = tail = GetLatestCheckpointEntryFromDisk(); - if (tail.storeVersion == -1 && tail.objectStoreVersion == -1) head = tail = null; + if (tail.metadata.storeVersion == -1 && tail.metadata.objectStoreVersion == -1) head = tail = null; //This purge does not check for active readers //1. If primary is initializing then we will not have any active readers since not connections are established at recovery @@ -72,9 +72,9 @@ public void PurgeAllCheckpointsExceptEntry(CheckpointEntry entry = null) entry ??= GetLatestCheckpointEntryFromDisk(); if (entry == null) return; LogCheckpointEntry("Purge all except", entry); - PurgeAllCheckpointsExceptTokens(StoreType.Main, entry.storeHlogToken, entry.storeIndexToken); + PurgeAllCheckpointsExceptTokens(StoreType.Main, entry.metadata.storeHlogToken, entry.metadata.storeIndexToken); if (!clusterProvider.serverOptions.DisableObjects) - PurgeAllCheckpointsExceptTokens(StoreType.Object, entry.objectStoreHlogToken, entry.objectStoreIndexToken); + PurgeAllCheckpointsExceptTokens(StoreType.Object, entry.metadata.objectStoreHlogToken, entry.metadata.objectStoreIndexToken); } /// @@ -124,8 +124,8 @@ public void AddCheckpointEntry(CheckpointEntry entry, StoreType storeType, bool var lastEntry = tail; Debug.Assert(lastEntry != null); - entry.storeIndexToken = lastEntry.storeIndexToken; - entry.objectStoreIndexToken = lastEntry.objectStoreIndexToken; + entry.metadata.storeIndexToken = lastEntry.metadata.storeIndexToken; + entry.metadata.objectStoreIndexToken = lastEntry.metadata.objectStoreIndexToken; } //Assume we don't have multiple writers so it is safe to update the tail directly @@ -135,20 +135,20 @@ public void AddCheckpointEntry(CheckpointEntry entry, StoreType storeType, bool { if (storeType == StoreType.Main) { - entry.objectStoreVersion = tail.objectStoreVersion; - entry.objectStoreHlogToken = tail.objectStoreHlogToken; - entry.objectStoreIndexToken = tail.objectStoreIndexToken; - entry.objectCheckpointCoveredAofAddress = tail.storeCheckpointCoveredAofAddress; - entry.objectStorePrimaryReplId = tail.objectStorePrimaryReplId; + entry.metadata.objectStoreVersion = tail.metadata.objectStoreVersion; + entry.metadata.objectStoreHlogToken = tail.metadata.objectStoreHlogToken; + entry.metadata.objectStoreIndexToken = tail.metadata.objectStoreIndexToken; + entry.metadata.objectCheckpointCoveredAofAddress = tail.metadata.storeCheckpointCoveredAofAddress; + entry.metadata.objectStorePrimaryReplId = tail.metadata.objectStorePrimaryReplId; } if (storeType == StoreType.Object) { - entry.storeVersion = tail.storeVersion; - entry.storeHlogToken = tail.storeHlogToken; - entry.storeIndexToken = tail.storeIndexToken; - entry.storeCheckpointCoveredAofAddress = tail.objectCheckpointCoveredAofAddress; - entry.storePrimaryReplId = tail.storePrimaryReplId; + entry.metadata.storeVersion = tail.metadata.storeVersion; + entry.metadata.storeHlogToken = tail.metadata.storeHlogToken; + entry.metadata.storeIndexToken = tail.metadata.storeIndexToken; + entry.metadata.storeCheckpointCoveredAofAddress = tail.metadata.objectCheckpointCoveredAofAddress; + entry.metadata.storePrimaryReplId = tail.metadata.storePrimaryReplId; } tail.next = entry; @@ -180,18 +180,18 @@ private void DeleteOutdatedCheckpoints() LogCheckpointEntry("Deleting checkpoint entry", curr); // Below check each checkpoint token separately if it is eligible for deletion if (CanDeleteToken(curr, CheckpointFileType.STORE_HLOG)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteLogCheckpoint(curr.storeHlogToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteLogCheckpoint(curr.metadata.storeHlogToken); if (CanDeleteToken(curr, CheckpointFileType.STORE_INDEX)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteIndexCheckpoint(curr.storeIndexToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteIndexCheckpoint(curr.metadata.storeIndexToken); if (!clusterProvider.serverOptions.DisableObjects) { if (CanDeleteToken(curr, CheckpointFileType.OBJ_STORE_HLOG)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteLogCheckpoint(curr.objectStoreHlogToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteLogCheckpoint(curr.metadata.objectStoreHlogToken); if (CanDeleteToken(curr, CheckpointFileType.OBJ_STORE_INDEX)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteIndexCheckpoint(curr.objectStoreIndexToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteIndexCheckpoint(curr.metadata.objectStoreIndexToken); } //At least one token can always be deleted thus invalidating the in-memory entry @@ -247,8 +247,11 @@ public CheckpointEntry GetLatestCheckpointEntryFromMemory() { var cEntry = new CheckpointEntry() { - storeCheckpointCoveredAofAddress = 0, - objectCheckpointCoveredAofAddress = clusterProvider.serverOptions.DisableObjects ? long.MaxValue : 0 + metadata = new() + { + storeCheckpointCoveredAofAddress = 0, + objectCheckpointCoveredAofAddress = clusterProvider.serverOptions.DisableObjects ? long.MaxValue : 0 + } }; cEntry.TryAddReader(); return cEntry; @@ -279,17 +282,20 @@ public CheckpointEntry GetLatestCheckpointEntryFromDisk() CheckpointEntry entry = new() { - storeVersion = storeHLogToken == default ? -1 : storeWrapper.store.GetLatestCheckpointVersion(), - storeHlogToken = storeHLogToken, - storeIndexToken = storeIndexToken, - storeCheckpointCoveredAofAddress = storeCheckpointCoveredAofAddress, - storePrimaryReplId = storePrimaryReplId, - - objectStoreVersion = objectStoreHLogToken == default ? -1 : storeWrapper.objectStore.GetLatestCheckpointVersion(), - objectStoreHlogToken = objectStoreHLogToken, - objectStoreIndexToken = objectStoreIndexToken, - objectCheckpointCoveredAofAddress = objectCheckpointCoveredAofAddress, - objectStorePrimaryReplId = objectStorePrimaryReplId, + metadata = new() + { + storeVersion = storeHLogToken == default ? -1 : storeWrapper.store.GetLatestCheckpointVersion(), + storeHlogToken = storeHLogToken, + storeIndexToken = storeIndexToken, + storeCheckpointCoveredAofAddress = storeCheckpointCoveredAofAddress, + storePrimaryReplId = storePrimaryReplId, + + objectStoreVersion = objectStoreHLogToken == default ? -1 : storeWrapper.objectStore.GetLatestCheckpointVersion(), + objectStoreHlogToken = objectStoreHLogToken, + objectStoreIndexToken = objectStoreIndexToken, + objectCheckpointCoveredAofAddress = objectCheckpointCoveredAofAddress, + objectStorePrimaryReplId = objectStorePrimaryReplId, + }, _lock = new SingleWriterMultiReaderLock() }; return entry; @@ -325,12 +331,12 @@ public void LogCheckpointEntry(string msg, CheckpointEntry entry) { logger?.LogTrace("{msg} {storeVersion} {storeHlogToken} {storeIndexToken} {objectStoreVersion} {objectStoreHlogToken} {objectStoreIndexToken}", msg, - entry.storeVersion, - entry.storeHlogToken, - entry.storeIndexToken, - entry.objectStoreVersion, - entry.objectStoreHlogToken, - entry.objectStoreIndexToken); + entry.metadata.storeVersion, + entry.metadata.storeHlogToken, + entry.metadata.storeIndexToken, + entry.metadata.objectStoreVersion, + entry.metadata.objectStoreHlogToken, + entry.metadata.objectStoreIndexToken); } } } \ No newline at end of file diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index 8b68e48bc1..005c12df36 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -75,7 +75,7 @@ public async Task SendCheckpoint() try { logger?.LogInformation("Replica replicaId:{replicaId} requesting checkpoint replicaStoreVersion:{replicaStoreVersion} replicaObjectStoreVersion:{replicaObjectStoreVersion}", - remoteNodeId, remoteEntry.storeVersion, remoteEntry.objectStoreVersion); + remoteNodeId, remoteEntry.metadata.storeVersion, remoteEntry.metadata.objectStoreVersion); gcs.Connect((int)clusterProvider.clusterManager.GetClusterTimeout().TotalMilliseconds); retry: @@ -83,21 +83,20 @@ public async Task SendCheckpoint() AcquireCheckpointEntry(out localEntry, out aofSyncTaskInfo); logger?.LogInformation("Checkpoint search completed"); - var primary_replId = clusterProvider.replicationManager.PrimaryReplId; - var primary_replId2 = clusterProvider.replicationManager.PrimaryReplId2; - - // If the replica does not have a checkpoint we will have to send the local if it exists - // Else we need to compare the checkpoint versions if replica comes from the same history as this primary - var canCompareMainStoreCheckpoint = string.IsNullOrEmpty(remoteEntry.storePrimaryReplId) || remoteEntry.storePrimaryReplId.Equals(localEntry.storePrimaryReplId); - var canCompareObjectStoreCheckpoint = string.IsNullOrEmpty(remoteEntry.objectStorePrimaryReplId) || remoteEntry.objectStorePrimaryReplId.Equals(localEntry.objectStorePrimaryReplId); - - // We can skip sending the local checkpoint if it is of same history and version. Remote checkpoints with greater version will be ovewritten - var skipSendingMainStore = localEntry.storeHlogToken == default || (canCompareMainStoreCheckpoint && localEntry.storeVersion == remoteEntry.storeVersion); - var skipSendingObjectStore = clusterProvider.serverOptions.DisableObjects || localEntry.objectStoreHlogToken == default || (canCompareObjectStoreCheckpoint && localEntry.objectStoreVersion == remoteEntry.objectStoreVersion); + // Local and remote checkpoints are of same history if both of the following hold + // 1. There is a checkpoint available at remote node + // 2. Remote and local checkpoints contain the same PrimaryReplId + var sameMainStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.metadata.storePrimaryReplId) && remoteEntry.metadata.storePrimaryReplId.Equals(localEntry.metadata.storePrimaryReplId); + var sameObjectStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.metadata.objectStorePrimaryReplId) && remoteEntry.metadata.objectStorePrimaryReplId.Equals(localEntry.metadata.objectStorePrimaryReplId); + // We will not send the latest local checkpoint if any of the following hold + // 1. Local node does not have any checkpoints + // 2. Local checkpoint is of same version and history as the remote checkpoint + var skipLocalMainStoreCheckpoint = localEntry.metadata.storeHlogToken == default || (sameMainStoreCheckpointHistory && localEntry.metadata.storeVersion == remoteEntry.metadata.storeVersion); + var skipLocalObjectStoreCheckpoint = clusterProvider.serverOptions.DisableObjects || localEntry.metadata.objectStoreHlogToken == default || (sameObjectStoreCheckpointHistory && localEntry.metadata.objectStoreVersion == remoteEntry.metadata.objectStoreVersion); LogFileInfo hlog_size = default; long index_size = -1; - if (!skipSendingMainStore) + if (!skipLocalMainStoreCheckpoint) { // Try to acquire metadata because checkpoint might not have completed and we have to spinWait // TODO: maybe try once and then go back to acquire new checkpoint or limit retries to avoid getting stuck @@ -113,7 +112,7 @@ public async Task SendCheckpoint() LogFileInfo obj_hlog_size = default; long obj_index_size = -1; - if (!skipSendingObjectStore) + if (!skipLocalObjectStoreCheckpoint) { // Try to acquire metadata because checkpoint might not have completed and we have to spinWait // TODO: maybe try once and then go back to acquire new checkpoint or limit retries to avoid getting stuck @@ -127,74 +126,74 @@ public async Task SendCheckpoint() } } - if (!skipSendingMainStore) + if (!skipLocalMainStoreCheckpoint) { - logger?.LogInformation("Sending main store checkpoint {version} {storeHlogToken} {storeIndexToken} to replica", localEntry.storeVersion, localEntry.storeHlogToken, localEntry.storeIndexToken); + logger?.LogInformation("Sending main store checkpoint {version} {storeHlogToken} {storeIndexToken} to replica", localEntry.metadata.storeVersion, localEntry.metadata.storeHlogToken, localEntry.metadata.storeIndexToken); // 1. send hlog file segments if (clusterProvider.serverOptions.EnableStorageTier && hlog_size.hybridLogFileEndAddress > 64) - await SendFileSegments(gcs, localEntry.storeHlogToken, CheckpointFileType.STORE_HLOG, hlog_size.hybridLogFileStartAddress, hlog_size.hybridLogFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.storeHlogToken, CheckpointFileType.STORE_HLOG, hlog_size.hybridLogFileStartAddress, hlog_size.hybridLogFileEndAddress); // 2.Send index file segments //var index_size = storeWrapper.store.GetIndexFileSize(localEntry.storeIndexToken); - await SendFileSegments(gcs, localEntry.storeIndexToken, CheckpointFileType.STORE_INDEX, 0, index_size); + await SendFileSegments(gcs, localEntry.metadata.storeIndexToken, CheckpointFileType.STORE_INDEX, 0, index_size); // 3. Send snapshot file segments - await SendFileSegments(gcs, localEntry.storeHlogToken, CheckpointFileType.STORE_SNAPSHOT, 0, hlog_size.snapshotFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.storeHlogToken, CheckpointFileType.STORE_SNAPSHOT, 0, hlog_size.snapshotFileEndAddress); // 4. Send delta log segments var dlog_size = hlog_size.deltaLogTailAddress; - await SendFileSegments(gcs, localEntry.storeHlogToken, CheckpointFileType.STORE_DLOG, 0, dlog_size); + await SendFileSegments(gcs, localEntry.metadata.storeHlogToken, CheckpointFileType.STORE_DLOG, 0, dlog_size); // 5.Send index metadata - await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_INDEX, localEntry.storeIndexToken); + await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_INDEX, localEntry.metadata.storeIndexToken); // 6. Send snapshot metadata - await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_SNAPSHOT, localEntry.storeHlogToken); + await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_SNAPSHOT, localEntry.metadata.storeHlogToken); } - if (!skipSendingObjectStore) + if (!skipLocalObjectStoreCheckpoint) { - logger?.LogInformation("Sending object store checkpoint {version} {objectStoreHlogToken} {objectStoreIndexToken} to replica", localEntry.objectStoreVersion, localEntry.objectStoreHlogToken, localEntry.objectStoreIndexToken); + logger?.LogInformation("Sending object store checkpoint {version} {objectStoreHlogToken} {objectStoreIndexToken} to replica", localEntry.metadata.objectStoreVersion, localEntry.metadata.objectStoreHlogToken, localEntry.metadata.objectStoreIndexToken); // 1. send hlog file segments if (clusterProvider.serverOptions.EnableStorageTier && obj_hlog_size.hybridLogFileEndAddress > 24) { //send object hlog file segments - await SendFileSegments(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG, obj_hlog_size.hybridLogFileStartAddress, obj_hlog_size.hybridLogFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG, obj_hlog_size.hybridLogFileStartAddress, obj_hlog_size.hybridLogFileEndAddress); var hlogSegmentCount = ((obj_hlog_size.hybridLogFileEndAddress - obj_hlog_size.hybridLogFileStartAddress) >> clusterProvider.serverOptions.ObjectStoreSegmentSizeBits()) + 1; - await SendObjectFiles(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG_OBJ, (int)hlogSegmentCount); + await SendObjectFiles(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG_OBJ, (int)hlogSegmentCount); } // 2. Send object store snapshot files if (obj_hlog_size.snapshotFileEndAddress > 24) { //send snapshot file segments - await SendFileSegments(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT, 0, obj_hlog_size.snapshotFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT, 0, obj_hlog_size.snapshotFileEndAddress); //send snapshot.obj file segments var snapshotSegmentCount = (obj_hlog_size.snapshotFileEndAddress >> clusterProvider.serverOptions.ObjectStoreSegmentSizeBits()) + 1; - await SendObjectFiles(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT_OBJ, (int)snapshotSegmentCount); + await SendObjectFiles(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT_OBJ, (int)snapshotSegmentCount); } // 3. Send object store index file segments if (obj_index_size > 0) - await SendFileSegments(gcs, localEntry.objectStoreIndexToken, CheckpointFileType.OBJ_STORE_INDEX, 0, obj_index_size); + await SendFileSegments(gcs, localEntry.metadata.objectStoreIndexToken, CheckpointFileType.OBJ_STORE_INDEX, 0, obj_index_size); // 4. Send object store delta file segments var obj_dlog_size = obj_hlog_size.deltaLogTailAddress; if (obj_dlog_size > 0) - await SendFileSegments(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_DLOG, 0, obj_dlog_size); + await SendFileSegments(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_DLOG, 0, obj_dlog_size); // 5. Send object store index metadata - await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_INDEX, localEntry.objectStoreIndexToken); + await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_INDEX, localEntry.metadata.objectStoreIndexToken); // 6. Send object store snapshot metadata - await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_SNAPSHOT, localEntry.objectStoreHlogToken); + await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_SNAPSHOT, localEntry.metadata.objectStoreHlogToken); } - var recoverFromRemote = !skipSendingMainStore || !skipSendingObjectStore; + var recoverFromRemote = !skipLocalMainStoreCheckpoint || !skipLocalObjectStoreCheckpoint; var replayAOF = false; var RecoveredReplicationOffset = localEntry.GetMinAofCoveredAddress(); var beginAddress = RecoveredReplicationOffset; @@ -242,12 +241,12 @@ public async Task SendCheckpoint() } // Signal replica to recover from local/remote checkpoint - // Make replica replayAOF if needed and replay from provided beginAddress to ReocoveredReplication Address + // Make replica replayAOF if needed and replay from provided beginAddress to RecoveredReplication Address var resp = await gcs.ExecuteBeginReplicaRecover( - !skipSendingMainStore, - !skipSendingObjectStore, + !skipLocalMainStoreCheckpoint, + !skipLocalObjectStoreCheckpoint, replayAOF, - primary_replId, + clusterProvider.replicationManager.PrimaryReplId, localEntry.ToByteArray(), beginAddress, RecoveredReplicationOffset).ConfigureAwait(false); @@ -325,7 +324,7 @@ public void AcquireCheckpointEntry(out CheckpointEntry cEntry, out AofSyncTaskIn cEntry = clusterProvider.replicationManager.GetLatestCheckpointEntryFromMemory(); // Break early if main-memory-replication on and do not wait for OnDemandCheckpoint - // We do this to avoid waiting indefinetely for a checkpoint that will never be taken + // We do this to avoid waiting indefinitely for a checkpoint that will never be taken if (clusterProvider.serverOptions.MainMemoryReplication && !clusterProvider.serverOptions.OnDemandCheckpoint) { logger?.LogWarning("MainMemoryReplication: OnDemandCheckpoint is turned off, skipping valid checkpoint acquisition."); diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs index 2044de0f0a..6bb4310b65 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs @@ -46,7 +46,6 @@ public bool TryBeginReplicate(ClusterSession session, string nodeid, bool backgr try { logger?.LogTrace("CLUSTER REPLICATE {nodeid}", nodeid); - if (!clusterProvider.clusterManager.TryAddReplica(nodeid, force: force, out errorMessage, logger: logger)) return false; @@ -315,9 +314,20 @@ public long BeginReplicaRecover( { UpdateLastPrimarySyncTime(); - logger?.LogInformation("Initiating Checkpoint Recovery at replica {sIndexToken} {sHlogToken} {oIndexToken} {oHlogToken}", remoteCheckpoint.storeIndexToken, remoteCheckpoint.storeHlogToken, remoteCheckpoint.objectStoreIndexToken, remoteCheckpoint.objectStoreHlogToken); - storeWrapper.RecoverCheckpoint(recoverMainStoreFromToken, recoverObjectStoreFromToken, - remoteCheckpoint.storeIndexToken, remoteCheckpoint.storeHlogToken, remoteCheckpoint.objectStoreIndexToken, remoteCheckpoint.objectStoreHlogToken); + logger?.LogInformation("Replica Recover MainStore: {storeVersion}>[{sIndexToken} {sHlogToken}]" + + "\nObjectStore: {objectStoreVersion}>[{oIndexToken} {oHlogToken}]", + remoteCheckpoint.metadata.storeVersion, + remoteCheckpoint.metadata.storeIndexToken, + remoteCheckpoint.metadata.storeHlogToken, + remoteCheckpoint.metadata.objectStoreVersion, + remoteCheckpoint.metadata.objectStoreIndexToken, + remoteCheckpoint.metadata.objectStoreHlogToken); + + storeWrapper.RecoverCheckpoint( + replicaRecover: true, + recoverMainStoreFromToken, + recoverObjectStoreFromToken, + remoteCheckpoint.metadata); if (replayAOF) { @@ -335,15 +345,15 @@ public long BeginReplicaRecover( // If checkpoint for main store was send add its token here in preparation for purge later on if (recoverMainStoreFromToken) { - cEntry.storeIndexToken = remoteCheckpoint.storeIndexToken; - cEntry.storeHlogToken = remoteCheckpoint.storeHlogToken; + cEntry.metadata.storeIndexToken = remoteCheckpoint.metadata.storeIndexToken; + cEntry.metadata.storeHlogToken = remoteCheckpoint.metadata.storeHlogToken; } // If checkpoint for object store was send add its token here in preparation for purge later on if (recoverObjectStoreFromToken) { - cEntry.objectStoreIndexToken = remoteCheckpoint.objectStoreIndexToken; - cEntry.objectStoreHlogToken = remoteCheckpoint.objectStoreHlogToken; + cEntry.metadata.objectStoreIndexToken = remoteCheckpoint.metadata.objectStoreIndexToken; + cEntry.metadata.objectStoreHlogToken = remoteCheckpoint.metadata.objectStoreHlogToken; } checkpointStore.PurgeAllCheckpointsExceptEntry(cEntry); diff --git a/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs b/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs index 9e027f1e91..99f690c512 100644 --- a/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs +++ b/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs @@ -31,8 +31,8 @@ public bool TryAcquireSettledMetadataForMainStore(CheckpointEntry entry, out Log index_size = -1; try { - hlog_size = storeWrapper.store.GetLogFileSize(entry.storeHlogToken); - index_size = storeWrapper.store.GetIndexFileSize(entry.storeIndexToken); + hlog_size = storeWrapper.store.GetLogFileSize(entry.metadata.storeHlogToken); + index_size = storeWrapper.store.GetIndexFileSize(entry.metadata.storeIndexToken); return true; } catch @@ -54,8 +54,8 @@ public bool TryAcquireSettledMetadataForObjectStore(CheckpointEntry entry, out L index_size = -1; try { - hlog_size = storeWrapper.objectStore.GetLogFileSize(entry.objectStoreHlogToken); - index_size = storeWrapper.objectStore.GetIndexFileSize(entry.objectStoreIndexToken); + hlog_size = storeWrapper.objectStore.GetLogFileSize(entry.metadata.objectStoreHlogToken); + index_size = storeWrapper.objectStore.GetIndexFileSize(entry.metadata.objectStoreIndexToken); return true; } catch diff --git a/libs/cluster/Server/Replication/ReplicationHistoryManager.cs b/libs/cluster/Server/Replication/ReplicationHistoryManager.cs index 8b30e828a9..9fca3999fe 100644 --- a/libs/cluster/Server/Replication/ReplicationHistoryManager.cs +++ b/libs/cluster/Server/Replication/ReplicationHistoryManager.cs @@ -95,17 +95,17 @@ internal sealed partial class ReplicationManager : IDisposable { ReplicationHistory currentReplicationConfig; readonly IDevice replicationConfigDevice; - readonly SectorAlignedBufferPool pool; + readonly SectorAlignedBufferPool replicationConfigDevicePool; - public void InitializeReplicationHistory() + private void InitializeReplicationHistory() { currentReplicationConfig = new ReplicationHistory(); FlushConfig(); } - public void RecoverReplicationHistory() + private void RecoverReplicationHistory() { - byte[] replConfig = ClusterUtils.ReadDevice(replicationConfigDevice, pool, logger); + var replConfig = ClusterUtils.ReadDevice(replicationConfigDevice, replicationConfigDevicePool, logger); currentReplicationConfig = ReplicationHistory.FromByteArray(replConfig); //TODO: handle scenario where replica crashed before became a primary and it has two replication ids //var current = storeWrapper.clusterManager.CurrentConfig; @@ -115,12 +115,12 @@ public void RecoverReplicationHistory() //} } - public void TryUpdateMyPrimaryReplId(string primary_replid) + private void TryUpdateMyPrimaryReplId(string primaryReplicationId) { while (true) { var current = currentReplicationConfig; - var newConfig = current.UpdateReplicationId(primary_replid); + var newConfig = current.UpdateReplicationId(primaryReplicationId); if (Interlocked.CompareExchange(ref currentReplicationConfig, newConfig, current) == current) break; } @@ -153,9 +153,9 @@ private void FlushConfig() { lock (this) { - logger?.LogTrace("Start FlushConfig {path}", replicationConfigDevice.FileName); - ClusterUtils.WriteInto(replicationConfigDevice, pool, 0, currentReplicationConfig.ToByteArray(), logger: logger); - logger?.LogTrace("End FlushConfig {path}", replicationConfigDevice.FileName); + logger?.LogTrace("Flushing replication history {path}", replicationConfigDevice.FileName); + ClusterUtils.WriteInto(replicationConfigDevice, replicationConfigDevicePool, 0, currentReplicationConfig.ToByteArray(), logger: logger); + logger?.LogTrace("Replication history flush completed {path}", replicationConfigDevice.FileName); } } } diff --git a/libs/cluster/Server/Replication/ReplicationManager.cs b/libs/cluster/Server/Replication/ReplicationManager.cs index 78e6844163..18aae98041 100644 --- a/libs/cluster/Server/Replication/ReplicationManager.cs +++ b/libs/cluster/Server/Replication/ReplicationManager.cs @@ -117,16 +117,22 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null var clusterDataPath = opts.CheckpointDir + clusterFolder; var deviceFactory = opts.GetInitializedDeviceFactory(clusterDataPath); replicationConfigDevice = deviceFactory.Get(new FileDescriptor(directoryName: "", fileName: "replication.conf")); - pool = new(1, (int)replicationConfigDevice.SectorSize); + replicationConfigDevicePool = new(1, (int)replicationConfigDevice.SectorSize); - var recoverConfig = replicationConfigDevice.GetFileSize(0) > 0; - if (!recoverConfig) + var canRecoverReplicationHistory = replicationConfigDevice.GetFileSize(0) > 0; + if (clusterProvider.serverOptions.Recover && canRecoverReplicationHistory) { - InitializeReplicationHistory(); + logger?.LogTrace("Recovering in-memory checkpoint registry"); + // If recover option is enabled and replication history information is available + // recover replication history and initialize in-memory checkpoint registry. + RecoverReplicationHistory(); } else { - RecoverReplicationHistory(); + logger?.LogTrace("Initializing new in-memory checkpoint registry"); + // If recover option is not enabled or replication history is not available + // initialize new empty replication history. + InitializeReplicationHistory(); } // After initializing replication history propagate replicationId to ReplicationLogCheckpointManager @@ -178,8 +184,8 @@ public void Dispose() { _disposed = true; - replicationConfigDevice.Dispose(); - pool.Free(); + replicationConfigDevice?.Dispose(); + replicationConfigDevicePool?.Free(); checkpointStore.WaitForReplicas(); replicaSyncSessionTaskStore.Dispose(); diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index e013ce74f0..dc557fcc67 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -29,7 +29,7 @@ namespace Garnet public class GarnetServer : IDisposable { // IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.csproj line ~8. - readonly string version = "1.0.38"; + readonly string version = "1.0.39"; internal GarnetProvider Provider; diff --git a/libs/server/Cluster/CheckpointMetadata.cs b/libs/server/Cluster/CheckpointMetadata.cs new file mode 100644 index 0000000000..3f10317117 --- /dev/null +++ b/libs/server/Cluster/CheckpointMetadata.cs @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; + +namespace Garnet.server +{ + public sealed class CheckpointMetadata + { + public long storeVersion; + public Guid storeHlogToken; + public Guid storeIndexToken; + public long storeCheckpointCoveredAofAddress; + public string storePrimaryReplId; + + public long objectStoreVersion; + public Guid objectStoreHlogToken; + public Guid objectStoreIndexToken; + public long objectCheckpointCoveredAofAddress; + public string objectStorePrimaryReplId; + + public CheckpointMetadata() + { + storeVersion = -1; + storeHlogToken = default; + storeIndexToken = default; + storeCheckpointCoveredAofAddress = long.MaxValue; + storePrimaryReplId = null; + + objectStoreVersion = -1; + objectStoreHlogToken = default; + objectStoreIndexToken = default; + objectCheckpointCoveredAofAddress = long.MaxValue; + objectStorePrimaryReplId = null; + } + } +} \ No newline at end of file diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index f01aace6f9..c778e955c1 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -237,14 +237,31 @@ internal void Recover() /// /// Caller will have to decide if recover is necessary, so we do not check if recover option is enabled /// - public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, - Guid storeIndexToken = default, Guid storeHlogToken = default, Guid objectStoreIndexToken = default, Guid objectStoreHlogToken = default) + public void RecoverCheckpoint(bool replicaRecover = false, bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, CheckpointMetadata metadata = null) { long storeVersion = -1, objectStoreVersion = -1; try { - storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken); - if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken); + if (replicaRecover) + { + if (metadata.storeIndexToken != default && metadata.storeHlogToken != default) + { + storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(metadata.storeIndexToken, metadata.storeHlogToken); + } + + if (!serverOptions.DisableObjects) + { + if (metadata.objectStoreIndexToken != default && metadata.objectStoreHlogToken != default) + { + objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(metadata.objectStoreIndexToken, metadata.objectStoreHlogToken); + } + } + } + else + { + storeVersion = store.Recover(); + if (objectStore != null) objectStoreVersion = objectStore.Recover(); + } if (storeVersion > 0 || objectStoreVersion > 0) lastSaveTime = DateTimeOffset.UtcNow; } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs index 5286c38bba..90638842ab 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs @@ -428,7 +428,7 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context) if (errorCode != 0) { var errorMessage = new Win32Exception((int)errorCode).Message; - logger?.LogError("[DeviceLogCheckpointManager] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage); + logger?.LogError("[DeviceLogManager] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage); } semaphore.Release(); } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs index b2746edada..7502d9ad42 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs @@ -414,6 +414,10 @@ public void Reset() // Reset the hybrid log hlogBase.Reset(); + + // Reset system state + systemState = SystemState.Make(Phase.REST, 1); + lastVersion = 0; } diff --git a/main/GarnetServer/GarnetServer.csproj b/main/GarnetServer/GarnetServer.csproj index 5d93032c7b..a29f77ef49 100644 --- a/main/GarnetServer/GarnetServer.csproj +++ b/main/GarnetServer/GarnetServer.csproj @@ -5,7 +5,7 @@ true - 1.0.38 + 1.0.39 garnet-server true garnet-server diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index 2d5570b2a4..2f5c67af74 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -1048,5 +1048,112 @@ public void ClusterReplicateFails() var exc = Assert.Throws(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect)); ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node ")); } + + [Test, Order(22), CancelAfter(testTimeout)] + public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) + { + var replica_count = 1;// Per primary + var primary_count = 1; + var nodes_count = primary_count + (primary_count * replica_count); + var primaryNodeIndex = 0; + var replicaNodeIndex = 1; + ClassicAssert.IsTrue(primary_count > 0); + context.CreateInstances(nodes_count, disableObjects: false, MainMemoryReplication: true, CommitFrequencyMs: -1, OnDemandCheckpoint: true, enableAOF: true, useTLS: useTLS); + context.CreateConnection(useTLS: useTLS); + _ = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); + + var keyLength = 32; + var kvpairCount = keyCount; + var addCount = 5; + context.kvPairs = []; + + // Populate Primary + if (!performRMW) + context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex); + else + context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex, addCount); + + context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger); + for (var i = 0; i < 5; i++) + { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryNodeIndex, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaNodeIndex, logger: context.logger); + context.clusterTestUtils.Checkpoint(primaryNodeIndex); + context.clusterTestUtils.WaitCheckpoint(primaryNodeIndex, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(replicaNodeIndex, replicaLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger); + } + + var primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); + var replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); + ClassicAssert.AreEqual("6", primaryVersion); + ClassicAssert.AreEqual(primaryVersion, replicaVersion); + + context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaNodeIndex); + + // Dispose primary and delete data + context.nodes[primaryNodeIndex].Dispose(true); + // Dispose primary but do not delete data + context.nodes[replicaNodeIndex].Dispose(false); + + // Restart primary and do not recover + context.nodes[primaryNodeIndex] = context.CreateInstance( + context.clusterTestUtils.GetEndPoint(primaryNodeIndex).Port, + disableObjects: true, + tryRecover: false, + enableAOF: true, + MainMemoryReplication: true, + CommitFrequencyMs: -1, + OnDemandCheckpoint: true, + timeout: timeout, + useTLS: useTLS, + cleanClusterConfig: true); + context.nodes[primaryNodeIndex].Start(); + + // Restart secondary and recover + context.nodes[replicaNodeIndex] = context.CreateInstance( + context.clusterTestUtils.GetEndPoint(replicaNodeIndex).Port, + disableObjects: true, + tryRecover: true, + enableAOF: true, + MainMemoryReplication: true, + CommitFrequencyMs: -1, + OnDemandCheckpoint: true, + timeout: timeout, + useTLS: useTLS, + cleanClusterConfig: true); + context.nodes[replicaNodeIndex].Start(); + context.CreateConnection(useTLS: useTLS); + + // Assert primary version is 1 and replica has recovered to previous checkpoint + primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); + replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); + ClassicAssert.AreEqual("1", primaryVersion); + ClassicAssert.AreEqual("6", replicaVersion); + + // Setup cluster + ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger)); + context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger); + context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger); + context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); + var primaryNodeId = context.clusterTestUtils.ClusterMyId(primaryNodeIndex, logger: context.logger); + + // Enable replication + context.clusterTestUtils.WaitUntilNodeIdIsKnown(replicaNodeIndex, primaryNodeId, logger: context.logger); + ClassicAssert.AreEqual("OK", context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger)); + + // Both nodes are at version 1 despite replica recovering to version earlier + primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); + replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); + ClassicAssert.AreEqual("1", primaryVersion); + ClassicAssert.AreEqual("1", replicaVersion); + + // At this point attached replica should be empty because primary did not have any data because it did not recover + foreach (var pair in context.kvPairs) + { + var resp = context.clusterTestUtils.GetKey(replicaNodeIndex, Encoding.ASCII.GetBytes(pair.Key), out _, out _, out _, out var state, logger: context.logger); + ClassicAssert.IsNull(resp); + } + } } } \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index ffc419f539..2653fe5374 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -2679,6 +2679,30 @@ public string GetFailoverState(IPEndPoint endPoint, ILogger logger = null) return items; } + public string GetInfo(int nodeIndex, string section, string segment, ILogger logger = null) + => GetInfo(endpoints[nodeIndex].ToIPEndPoint(), section, segment, logger); + + public string GetInfo(IPEndPoint endPoint, string section, string segment, ILogger logger = null) + { + try + { + var server = redis.GetServer(endPoint); + var result = server.Info(section); + ClassicAssert.AreEqual(1, result.Length, "section does not exist"); + foreach (var item in result[0]) + if (item.Key.Equals(segment)) + return item.Value; + Assert.Fail($"Segment not available for {section} section"); + return ""; + } + catch (Exception ex) + { + logger?.LogError(ex, "An error has occurred; GetFailoverState"); + Assert.Fail(ex.Message); + return null; + } + } + public void WaitForReplicaAofSync(int primaryIndex, int secondaryIndex, ILogger logger = null) { long primaryReplicationOffset;