From e3f60b4d4af98e91574323d60a87acda36a10811 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 7 Aug 2024 11:28:39 -0700 Subject: [PATCH 01/10] skip sending primary's checkpoint only if history and version match with that of the remote checkpoint --- .../PrimaryOps/ReplicaSyncSession.cs | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index 8b68e48bc1..a297e0b41b 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -83,21 +83,20 @@ public async Task SendCheckpoint() AcquireCheckpointEntry(out localEntry, out aofSyncTaskInfo); logger?.LogInformation("Checkpoint search completed"); - var primary_replId = clusterProvider.replicationManager.PrimaryReplId; - var primary_replId2 = clusterProvider.replicationManager.PrimaryReplId2; - - // If the replica does not have a checkpoint we will have to send the local if it exists - // Else we need to compare the checkpoint versions if replica comes from the same history as this primary - var canCompareMainStoreCheckpoint = string.IsNullOrEmpty(remoteEntry.storePrimaryReplId) || remoteEntry.storePrimaryReplId.Equals(localEntry.storePrimaryReplId); - var canCompareObjectStoreCheckpoint = string.IsNullOrEmpty(remoteEntry.objectStorePrimaryReplId) || remoteEntry.objectStorePrimaryReplId.Equals(localEntry.objectStorePrimaryReplId); - - // We can skip sending the local checkpoint if it is of same history and version. Remote checkpoints with greater version will be ovewritten - var skipSendingMainStore = localEntry.storeHlogToken == default || (canCompareMainStoreCheckpoint && localEntry.storeVersion == remoteEntry.storeVersion); - var skipSendingObjectStore = clusterProvider.serverOptions.DisableObjects || localEntry.objectStoreHlogToken == default || (canCompareObjectStoreCheckpoint && localEntry.objectStoreVersion == remoteEntry.objectStoreVersion); + // Local and remote checkpoints are of same history if both of the following hold + // 1. There is a checkpoint available at remote node + // 2. Remote and local checkpoints contain the same PrimaryReplId + var sameMainStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.storePrimaryReplId) && remoteEntry.storePrimaryReplId.Equals(localEntry.storePrimaryReplId); + var sameObjectStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.objectStorePrimaryReplId) && remoteEntry.objectStorePrimaryReplId.Equals(localEntry.objectStorePrimaryReplId); + // We will not send the latest local checkpoint if any of the following hold + // 1. Local node does not have any checkpoints + // 2. Local checkpoint is of same version and history as the remote checkpoint + var skipLocalMainStoreCheckpoint = localEntry.storeHlogToken == default || (sameMainStoreCheckpointHistory && localEntry.storeVersion == remoteEntry.storeVersion); + var skipLocalObjectStoreCheckpoint = clusterProvider.serverOptions.DisableObjects || localEntry.objectStoreHlogToken == default || (sameObjectStoreCheckpointHistory && localEntry.objectStoreVersion == remoteEntry.objectStoreVersion); LogFileInfo hlog_size = default; long index_size = -1; - if (!skipSendingMainStore) + if (!skipLocalMainStoreCheckpoint) { // Try to acquire metadata because checkpoint might not have completed and we have to spinWait // TODO: maybe try once and then go back to acquire new checkpoint or limit retries to avoid getting stuck @@ -113,7 +112,7 @@ public async Task SendCheckpoint() LogFileInfo obj_hlog_size = default; long obj_index_size = -1; - if (!skipSendingObjectStore) + if (!skipLocalObjectStoreCheckpoint) { // Try to acquire metadata because checkpoint might not have completed and we have to spinWait // TODO: maybe try once and then go back to acquire new checkpoint or limit retries to avoid getting stuck @@ -127,7 +126,7 @@ public async Task SendCheckpoint() } } - if (!skipSendingMainStore) + if (!skipLocalMainStoreCheckpoint) { logger?.LogInformation("Sending main store checkpoint {version} {storeHlogToken} {storeIndexToken} to replica", localEntry.storeVersion, localEntry.storeHlogToken, localEntry.storeIndexToken); @@ -153,7 +152,7 @@ public async Task SendCheckpoint() await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_SNAPSHOT, localEntry.storeHlogToken); } - if (!skipSendingObjectStore) + if (!skipLocalObjectStoreCheckpoint) { logger?.LogInformation("Sending object store checkpoint {version} {objectStoreHlogToken} {objectStoreIndexToken} to replica", localEntry.objectStoreVersion, localEntry.objectStoreHlogToken, localEntry.objectStoreIndexToken); @@ -194,7 +193,7 @@ public async Task SendCheckpoint() await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_SNAPSHOT, localEntry.objectStoreHlogToken); } - var recoverFromRemote = !skipSendingMainStore || !skipSendingObjectStore; + var recoverFromRemote = !skipLocalMainStoreCheckpoint || !skipLocalObjectStoreCheckpoint; var replayAOF = false; var RecoveredReplicationOffset = localEntry.GetMinAofCoveredAddress(); var beginAddress = RecoveredReplicationOffset; @@ -242,12 +241,12 @@ public async Task SendCheckpoint() } // Signal replica to recover from local/remote checkpoint - // Make replica replayAOF if needed and replay from provided beginAddress to ReocoveredReplication Address + // Make replica replayAOF if needed and replay from provided beginAddress to RecoveredReplication Address var resp = await gcs.ExecuteBeginReplicaRecover( - !skipSendingMainStore, - !skipSendingObjectStore, + !skipLocalMainStoreCheckpoint, + !skipLocalObjectStoreCheckpoint, replayAOF, - primary_replId, + clusterProvider.replicationManager.PrimaryReplId, localEntry.ToByteArray(), beginAddress, RecoveredReplicationOffset).ConfigureAwait(false); @@ -325,7 +324,7 @@ public void AcquireCheckpointEntry(out CheckpointEntry cEntry, out AofSyncTaskIn cEntry = clusterProvider.replicationManager.GetLatestCheckpointEntryFromMemory(); // Break early if main-memory-replication on and do not wait for OnDemandCheckpoint - // We do this to avoid waiting indefinetely for a checkpoint that will never be taken + // We do this to avoid waiting indefinitely for a checkpoint that will never be taken if (clusterProvider.serverOptions.MainMemoryReplication && !clusterProvider.serverOptions.OnDemandCheckpoint) { logger?.LogWarning("MainMemoryReplication: OnDemandCheckpoint is turned off, skipping valid checkpoint acquisition."); From 7aec8e8c65766715be659452174d1329f06bd37b Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 8 Aug 2024 11:53:43 -0700 Subject: [PATCH 02/10] reset system state and verion on store.Reset --- libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs index b2746edada..7502d9ad42 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs @@ -414,6 +414,10 @@ public void Reset() // Reset the hybrid log hlogBase.Reset(); + + // Reset system state + systemState = SystemState.Make(Phase.REST, 1); + lastVersion = 0; } From d82f6f8a3ccb4e8e58ee3afadf32e11062a616da Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 8 Aug 2024 11:54:48 -0700 Subject: [PATCH 03/10] recover in-memory checkpoint store only when recover flag is set --- .../Replication/ReplicationHistoryManager.cs | 18 ++++++++--------- .../Server/Replication/ReplicationManager.cs | 20 ++++++++++++------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicationHistoryManager.cs b/libs/cluster/Server/Replication/ReplicationHistoryManager.cs index 8b30e828a9..9fca3999fe 100644 --- a/libs/cluster/Server/Replication/ReplicationHistoryManager.cs +++ b/libs/cluster/Server/Replication/ReplicationHistoryManager.cs @@ -95,17 +95,17 @@ internal sealed partial class ReplicationManager : IDisposable { ReplicationHistory currentReplicationConfig; readonly IDevice replicationConfigDevice; - readonly SectorAlignedBufferPool pool; + readonly SectorAlignedBufferPool replicationConfigDevicePool; - public void InitializeReplicationHistory() + private void InitializeReplicationHistory() { currentReplicationConfig = new ReplicationHistory(); FlushConfig(); } - public void RecoverReplicationHistory() + private void RecoverReplicationHistory() { - byte[] replConfig = ClusterUtils.ReadDevice(replicationConfigDevice, pool, logger); + var replConfig = ClusterUtils.ReadDevice(replicationConfigDevice, replicationConfigDevicePool, logger); currentReplicationConfig = ReplicationHistory.FromByteArray(replConfig); //TODO: handle scenario where replica crashed before became a primary and it has two replication ids //var current = storeWrapper.clusterManager.CurrentConfig; @@ -115,12 +115,12 @@ public void RecoverReplicationHistory() //} } - public void TryUpdateMyPrimaryReplId(string primary_replid) + private void TryUpdateMyPrimaryReplId(string primaryReplicationId) { while (true) { var current = currentReplicationConfig; - var newConfig = current.UpdateReplicationId(primary_replid); + var newConfig = current.UpdateReplicationId(primaryReplicationId); if (Interlocked.CompareExchange(ref currentReplicationConfig, newConfig, current) == current) break; } @@ -153,9 +153,9 @@ private void FlushConfig() { lock (this) { - logger?.LogTrace("Start FlushConfig {path}", replicationConfigDevice.FileName); - ClusterUtils.WriteInto(replicationConfigDevice, pool, 0, currentReplicationConfig.ToByteArray(), logger: logger); - logger?.LogTrace("End FlushConfig {path}", replicationConfigDevice.FileName); + logger?.LogTrace("Flushing replication history {path}", replicationConfigDevice.FileName); + ClusterUtils.WriteInto(replicationConfigDevice, replicationConfigDevicePool, 0, currentReplicationConfig.ToByteArray(), logger: logger); + logger?.LogTrace("Replication history flush completed {path}", replicationConfigDevice.FileName); } } } diff --git a/libs/cluster/Server/Replication/ReplicationManager.cs b/libs/cluster/Server/Replication/ReplicationManager.cs index 78e6844163..18aae98041 100644 --- a/libs/cluster/Server/Replication/ReplicationManager.cs +++ b/libs/cluster/Server/Replication/ReplicationManager.cs @@ -117,16 +117,22 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null var clusterDataPath = opts.CheckpointDir + clusterFolder; var deviceFactory = opts.GetInitializedDeviceFactory(clusterDataPath); replicationConfigDevice = deviceFactory.Get(new FileDescriptor(directoryName: "", fileName: "replication.conf")); - pool = new(1, (int)replicationConfigDevice.SectorSize); + replicationConfigDevicePool = new(1, (int)replicationConfigDevice.SectorSize); - var recoverConfig = replicationConfigDevice.GetFileSize(0) > 0; - if (!recoverConfig) + var canRecoverReplicationHistory = replicationConfigDevice.GetFileSize(0) > 0; + if (clusterProvider.serverOptions.Recover && canRecoverReplicationHistory) { - InitializeReplicationHistory(); + logger?.LogTrace("Recovering in-memory checkpoint registry"); + // If recover option is enabled and replication history information is available + // recover replication history and initialize in-memory checkpoint registry. + RecoverReplicationHistory(); } else { - RecoverReplicationHistory(); + logger?.LogTrace("Initializing new in-memory checkpoint registry"); + // If recover option is not enabled or replication history is not available + // initialize new empty replication history. + InitializeReplicationHistory(); } // After initializing replication history propagate replicationId to ReplicationLogCheckpointManager @@ -178,8 +184,8 @@ public void Dispose() { _disposed = true; - replicationConfigDevice.Dispose(); - pool.Free(); + replicationConfigDevice?.Dispose(); + replicationConfigDevicePool?.Free(); checkpointStore.WaitForReplicas(); replicaSyncSessionTaskStore.Dispose(); From d2dc843f28cf97e3080925d4f8862e92ccc8961f Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 8 Aug 2024 13:41:46 -0700 Subject: [PATCH 04/10] add more info in replica recovery log message --- .../ReplicaOps/ReplicaReceiveCheckpoint.cs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs index 2044de0f0a..064fa78c0d 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs @@ -46,7 +46,6 @@ public bool TryBeginReplicate(ClusterSession session, string nodeid, bool backgr try { logger?.LogTrace("CLUSTER REPLICATE {nodeid}", nodeid); - if (!clusterProvider.clusterManager.TryAddReplica(nodeid, force: force, out errorMessage, logger: logger)) return false; @@ -315,9 +314,22 @@ public long BeginReplicaRecover( { UpdateLastPrimarySyncTime(); - logger?.LogInformation("Initiating Checkpoint Recovery at replica {sIndexToken} {sHlogToken} {oIndexToken} {oHlogToken}", remoteCheckpoint.storeIndexToken, remoteCheckpoint.storeHlogToken, remoteCheckpoint.objectStoreIndexToken, remoteCheckpoint.objectStoreHlogToken); - storeWrapper.RecoverCheckpoint(recoverMainStoreFromToken, recoverObjectStoreFromToken, - remoteCheckpoint.storeIndexToken, remoteCheckpoint.storeHlogToken, remoteCheckpoint.objectStoreIndexToken, remoteCheckpoint.objectStoreHlogToken); + logger?.LogInformation("Replica Recover MainStore: {storeVersion}>[{sIndexToken} {sHlogToken}]" + + "\nObjectStore: {objectStoreVersion}>[{oIndexToken} {oHlogToken}]", + remoteCheckpoint.storeVersion, + remoteCheckpoint.storeIndexToken, + remoteCheckpoint.storeHlogToken, + remoteCheckpoint.objectStoreVersion, + remoteCheckpoint.objectStoreIndexToken, + remoteCheckpoint.objectStoreHlogToken); + + storeWrapper.RecoverCheckpoint( + recoverMainStoreFromToken, + recoverObjectStoreFromToken, + remoteCheckpoint.storeIndexToken, + remoteCheckpoint.storeHlogToken, + remoteCheckpoint.objectStoreIndexToken, + remoteCheckpoint.objectStoreHlogToken); if (replayAOF) { From 594c3218afa87503b79323277183031063e46765 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 8 Aug 2024 15:13:42 -0700 Subject: [PATCH 05/10] separate checkpoint entry metadata --- libs/cluster/Server/ClusterProvider.cs | 20 ++-- .../Server/Replication/CheckpointEntry.cs | 102 ++++++++---------- .../Server/Replication/CheckpointStore.cs | 82 +++++++------- .../PrimaryOps/ReplicaSyncSession.cs | 42 ++++---- .../ReplicaOps/ReplicaReceiveCheckpoint.cs | 25 ++--- .../ReplicationCheckpointManagement.cs | 8 +- libs/server/Cluster/CheckpointMetadata.cs | 37 +++++++ libs/server/StoreWrapper.cs | 7 +- 8 files changed, 172 insertions(+), 151 deletions(-) create mode 100644 libs/server/Cluster/CheckpointMetadata.cs diff --git a/libs/cluster/Server/ClusterProvider.cs b/libs/cluster/Server/ClusterProvider.cs index 65781c672f..a864a6a611 100644 --- a/libs/cluster/Server/ClusterProvider.cs +++ b/libs/cluster/Server/ClusterProvider.cs @@ -153,20 +153,20 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover if (storeType is StoreType.Main or StoreType.All) { - entry.storeVersion = storeWrapper.store.CurrentVersion; - entry.storeHlogToken = storeCheckpointToken; - entry.storeIndexToken = storeCheckpointToken; - entry.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; - entry.storePrimaryReplId = replicationManager.PrimaryReplId; + entry.metadata.storeVersion = storeWrapper.store.CurrentVersion; + entry.metadata.storeHlogToken = storeCheckpointToken; + entry.metadata.storeIndexToken = storeCheckpointToken; + entry.metadata.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; + entry.metadata.storePrimaryReplId = replicationManager.PrimaryReplId; } if (storeType is StoreType.Object or StoreType.All) { - entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion; - entry.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; - entry.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; - entry.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; - entry.objectStorePrimaryReplId = replicationManager.PrimaryReplId; + entry.metadata.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion; + entry.metadata.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; + entry.metadata.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken; + entry.metadata.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress; + entry.metadata.objectStorePrimaryReplId = replicationManager.PrimaryReplId; } // Keep track of checkpoints for replica diff --git a/libs/cluster/Server/Replication/CheckpointEntry.cs b/libs/cluster/Server/Replication/CheckpointEntry.cs index a775f28444..b8a7a776ab 100644 --- a/libs/cluster/Server/Replication/CheckpointEntry.cs +++ b/libs/cluster/Server/Replication/CheckpointEntry.cs @@ -5,45 +5,24 @@ using System.IO; using System.Text; using Garnet.common; +using Garnet.server; namespace Garnet.cluster { sealed class CheckpointEntry { - public long storeVersion; - public Guid storeHlogToken; - public Guid storeIndexToken; - public long storeCheckpointCoveredAofAddress; - public string storePrimaryReplId; - - public long objectStoreVersion; - public Guid objectStoreHlogToken; - public Guid objectStoreIndexToken; - public long objectCheckpointCoveredAofAddress; - public string objectStorePrimaryReplId; - + public CheckpointMetadata metadata; public SingleWriterMultiReaderLock _lock; public CheckpointEntry next; public CheckpointEntry() { - storeVersion = -1; - storeHlogToken = default; - storeIndexToken = default; - storeCheckpointCoveredAofAddress = long.MaxValue; - storePrimaryReplId = null; - - objectStoreVersion = -1; - objectStoreHlogToken = default; - objectStoreIndexToken = default; - objectCheckpointCoveredAofAddress = long.MaxValue; - objectStorePrimaryReplId = null; - + metadata = new(); next = null; } public long GetMinAofCoveredAddress() - => Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64); + => Math.Max(Math.Min(metadata.storeCheckpointCoveredAofAddress, metadata.objectCheckpointCoveredAofAddress), 64); /// /// Indicate addition of new reader by trying to increment reader counter @@ -76,10 +55,10 @@ public bool ContainsSharedToken(CheckpointEntry entry, CheckpointFileType fileTy { return fileType switch { - CheckpointFileType.STORE_HLOG => storeHlogToken.Equals(entry.storeHlogToken), - CheckpointFileType.STORE_INDEX => storeIndexToken.Equals(entry.storeIndexToken), - CheckpointFileType.OBJ_STORE_HLOG => objectStoreHlogToken.Equals(entry.objectStoreHlogToken), - CheckpointFileType.OBJ_STORE_INDEX => objectStoreIndexToken.Equals(entry.objectStoreIndexToken), + CheckpointFileType.STORE_HLOG => metadata.storeHlogToken.Equals(entry.metadata.storeHlogToken), + CheckpointFileType.STORE_INDEX => metadata.storeIndexToken.Equals(entry.metadata.storeIndexToken), + CheckpointFileType.OBJ_STORE_HLOG => metadata.objectStoreHlogToken.Equals(entry.metadata.objectStoreHlogToken), + CheckpointFileType.OBJ_STORE_INDEX => metadata.objectStoreIndexToken.Equals(entry.metadata.objectStoreIndexToken), _ => throw new Exception($"Option {fileType} not supported") }; } @@ -87,15 +66,15 @@ public bool ContainsSharedToken(CheckpointEntry entry, CheckpointFileType fileTy public string GetCheckpointEntryDump() { string dump = $"\n" + - $"storeVersion: {storeVersion}\n" + - $"storeHlogToken: {storeHlogToken}\n" + - $"storeIndexToken: {storeIndexToken}\n" + - $"storeCheckpointCoveredAofAddress: {storeCheckpointCoveredAofAddress}\n" + + $"storeVersion: {metadata.storeVersion}\n" + + $"storeHlogToken: {metadata.storeHlogToken}\n" + + $"storeIndexToken: {metadata.storeIndexToken}\n" + + $"storeCheckpointCoveredAofAddress: {metadata.storeCheckpointCoveredAofAddress}\n" + $"------------------------------------------------------------------------\n" + - $"objectStoreVersion:{objectStoreVersion}\n" + - $"objectStoreHlogToken:{objectStoreHlogToken}\n" + - $"objectStoreIndexToken:{objectStoreIndexToken}\n" + - $"objectCheckpointCoveredAofAddress:{objectCheckpointCoveredAofAddress}\n" + + $"objectStoreVersion:{metadata.objectStoreVersion}\n" + + $"objectStoreHlogToken:{metadata.objectStoreHlogToken}\n" + + $"objectStoreIndexToken:{metadata.objectStoreIndexToken}\n" + + $"objectCheckpointCoveredAofAddress:{metadata.objectCheckpointCoveredAofAddress}\n" + $"------------------------------------------------------------------------\n" + $"activeReaders:{_lock}"; return dump; @@ -108,28 +87,28 @@ public byte[] ToByteArray() byte[] byteBuffer = default; //Write checkpoint entry data for main store - writer.Write(storeVersion); - byteBuffer = storeHlogToken.ToByteArray(); + writer.Write(metadata.storeVersion); + byteBuffer = metadata.storeHlogToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - byteBuffer = storeIndexToken.ToByteArray(); + byteBuffer = metadata.storeIndexToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - writer.Write(storeCheckpointCoveredAofAddress); - writer.Write(storePrimaryReplId == null ? 0 : 1); - if (storePrimaryReplId != null) writer.Write(storePrimaryReplId); + writer.Write(metadata.storeCheckpointCoveredAofAddress); + writer.Write(metadata.storePrimaryReplId == null ? 0 : 1); + if (metadata.storePrimaryReplId != null) writer.Write(metadata.storePrimaryReplId); //Write checkpoint entry data for object store - writer.Write(objectStoreVersion); - byteBuffer = objectStoreHlogToken.ToByteArray(); + writer.Write(metadata.objectStoreVersion); + byteBuffer = metadata.objectStoreHlogToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - byteBuffer = objectStoreIndexToken.ToByteArray(); + byteBuffer = metadata.objectStoreIndexToken.ToByteArray(); writer.Write(byteBuffer.Length); writer.Write(byteBuffer); - writer.Write(objectCheckpointCoveredAofAddress); - writer.Write(objectStorePrimaryReplId == null ? 0 : 1); - if (objectStorePrimaryReplId != null) writer.Write(objectStorePrimaryReplId); + writer.Write(metadata.objectCheckpointCoveredAofAddress); + writer.Write(metadata.objectStorePrimaryReplId == null ? 0 : 1); + if (metadata.objectStorePrimaryReplId != null) writer.Write(metadata.objectStorePrimaryReplId); byte[] byteArray = ms.ToArray(); writer.Dispose(); @@ -143,17 +122,20 @@ public static CheckpointEntry FromByteArray(byte[] serialized) var reader = new BinaryReader(ms); var cEntry = new CheckpointEntry { - storeVersion = reader.ReadInt64(), - storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - storeCheckpointCoveredAofAddress = reader.ReadInt64(), - storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default, - - objectStoreVersion = reader.ReadInt64(), - objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), - objectCheckpointCoveredAofAddress = reader.ReadInt64(), - objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default + metadata = new() + { + storeVersion = reader.ReadInt64(), + storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + storeCheckpointCoveredAofAddress = reader.ReadInt64(), + storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default, + + objectStoreVersion = reader.ReadInt64(), + objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())), + objectCheckpointCoveredAofAddress = reader.ReadInt64(), + objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default + } }; reader.Dispose(); diff --git a/libs/cluster/Server/Replication/CheckpointStore.cs b/libs/cluster/Server/Replication/CheckpointStore.cs index 3740134341..52a78f7285 100644 --- a/libs/cluster/Server/Replication/CheckpointStore.cs +++ b/libs/cluster/Server/Replication/CheckpointStore.cs @@ -40,7 +40,7 @@ public void Initialize() { head = tail = GetLatestCheckpointEntryFromDisk(); - if (tail.storeVersion == -1 && tail.objectStoreVersion == -1) head = tail = null; + if (tail.metadata.storeVersion == -1 && tail.metadata.objectStoreVersion == -1) head = tail = null; //This purge does not check for active readers //1. If primary is initializing then we will not have any active readers since not connections are established at recovery @@ -72,9 +72,9 @@ public void PurgeAllCheckpointsExceptEntry(CheckpointEntry entry = null) entry ??= GetLatestCheckpointEntryFromDisk(); if (entry == null) return; LogCheckpointEntry("Purge all except", entry); - PurgeAllCheckpointsExceptTokens(StoreType.Main, entry.storeHlogToken, entry.storeIndexToken); + PurgeAllCheckpointsExceptTokens(StoreType.Main, entry.metadata.storeHlogToken, entry.metadata.storeIndexToken); if (!clusterProvider.serverOptions.DisableObjects) - PurgeAllCheckpointsExceptTokens(StoreType.Object, entry.objectStoreHlogToken, entry.objectStoreIndexToken); + PurgeAllCheckpointsExceptTokens(StoreType.Object, entry.metadata.objectStoreHlogToken, entry.metadata.objectStoreIndexToken); } /// @@ -124,8 +124,8 @@ public void AddCheckpointEntry(CheckpointEntry entry, StoreType storeType, bool var lastEntry = tail; Debug.Assert(lastEntry != null); - entry.storeIndexToken = lastEntry.storeIndexToken; - entry.objectStoreIndexToken = lastEntry.objectStoreIndexToken; + entry.metadata.storeIndexToken = lastEntry.metadata.storeIndexToken; + entry.metadata.objectStoreIndexToken = lastEntry.metadata.objectStoreIndexToken; } //Assume we don't have multiple writers so it is safe to update the tail directly @@ -135,20 +135,20 @@ public void AddCheckpointEntry(CheckpointEntry entry, StoreType storeType, bool { if (storeType == StoreType.Main) { - entry.objectStoreVersion = tail.objectStoreVersion; - entry.objectStoreHlogToken = tail.objectStoreHlogToken; - entry.objectStoreIndexToken = tail.objectStoreIndexToken; - entry.objectCheckpointCoveredAofAddress = tail.storeCheckpointCoveredAofAddress; - entry.objectStorePrimaryReplId = tail.objectStorePrimaryReplId; + entry.metadata.objectStoreVersion = tail.metadata.objectStoreVersion; + entry.metadata.objectStoreHlogToken = tail.metadata.objectStoreHlogToken; + entry.metadata.objectStoreIndexToken = tail.metadata.objectStoreIndexToken; + entry.metadata.objectCheckpointCoveredAofAddress = tail.metadata.storeCheckpointCoveredAofAddress; + entry.metadata.objectStorePrimaryReplId = tail.metadata.objectStorePrimaryReplId; } if (storeType == StoreType.Object) { - entry.storeVersion = tail.storeVersion; - entry.storeHlogToken = tail.storeHlogToken; - entry.storeIndexToken = tail.storeIndexToken; - entry.storeCheckpointCoveredAofAddress = tail.objectCheckpointCoveredAofAddress; - entry.storePrimaryReplId = tail.storePrimaryReplId; + entry.metadata.storeVersion = tail.metadata.storeVersion; + entry.metadata.storeHlogToken = tail.metadata.storeHlogToken; + entry.metadata.storeIndexToken = tail.metadata.storeIndexToken; + entry.metadata.storeCheckpointCoveredAofAddress = tail.metadata.objectCheckpointCoveredAofAddress; + entry.metadata.storePrimaryReplId = tail.metadata.storePrimaryReplId; } tail.next = entry; @@ -180,18 +180,18 @@ private void DeleteOutdatedCheckpoints() LogCheckpointEntry("Deleting checkpoint entry", curr); // Below check each checkpoint token separately if it is eligible for deletion if (CanDeleteToken(curr, CheckpointFileType.STORE_HLOG)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteLogCheckpoint(curr.storeHlogToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteLogCheckpoint(curr.metadata.storeHlogToken); if (CanDeleteToken(curr, CheckpointFileType.STORE_INDEX)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteIndexCheckpoint(curr.storeIndexToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).DeleteIndexCheckpoint(curr.metadata.storeIndexToken); if (!clusterProvider.serverOptions.DisableObjects) { if (CanDeleteToken(curr, CheckpointFileType.OBJ_STORE_HLOG)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteLogCheckpoint(curr.objectStoreHlogToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteLogCheckpoint(curr.metadata.objectStoreHlogToken); if (CanDeleteToken(curr, CheckpointFileType.OBJ_STORE_INDEX)) - clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteIndexCheckpoint(curr.objectStoreIndexToken); + clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).DeleteIndexCheckpoint(curr.metadata.objectStoreIndexToken); } //At least one token can always be deleted thus invalidating the in-memory entry @@ -247,8 +247,11 @@ public CheckpointEntry GetLatestCheckpointEntryFromMemory() { var cEntry = new CheckpointEntry() { - storeCheckpointCoveredAofAddress = 0, - objectCheckpointCoveredAofAddress = clusterProvider.serverOptions.DisableObjects ? long.MaxValue : 0 + metadata = new() + { + storeCheckpointCoveredAofAddress = 0, + objectCheckpointCoveredAofAddress = clusterProvider.serverOptions.DisableObjects ? long.MaxValue : 0 + } }; cEntry.TryAddReader(); return cEntry; @@ -279,17 +282,20 @@ public CheckpointEntry GetLatestCheckpointEntryFromDisk() CheckpointEntry entry = new() { - storeVersion = storeHLogToken == default ? -1 : storeWrapper.store.GetLatestCheckpointVersion(), - storeHlogToken = storeHLogToken, - storeIndexToken = storeIndexToken, - storeCheckpointCoveredAofAddress = storeCheckpointCoveredAofAddress, - storePrimaryReplId = storePrimaryReplId, - - objectStoreVersion = objectStoreHLogToken == default ? -1 : storeWrapper.objectStore.GetLatestCheckpointVersion(), - objectStoreHlogToken = objectStoreHLogToken, - objectStoreIndexToken = objectStoreIndexToken, - objectCheckpointCoveredAofAddress = objectCheckpointCoveredAofAddress, - objectStorePrimaryReplId = objectStorePrimaryReplId, + metadata = new() + { + storeVersion = storeHLogToken == default ? -1 : storeWrapper.store.GetLatestCheckpointVersion(), + storeHlogToken = storeHLogToken, + storeIndexToken = storeIndexToken, + storeCheckpointCoveredAofAddress = storeCheckpointCoveredAofAddress, + storePrimaryReplId = storePrimaryReplId, + + objectStoreVersion = objectStoreHLogToken == default ? -1 : storeWrapper.objectStore.GetLatestCheckpointVersion(), + objectStoreHlogToken = objectStoreHLogToken, + objectStoreIndexToken = objectStoreIndexToken, + objectCheckpointCoveredAofAddress = objectCheckpointCoveredAofAddress, + objectStorePrimaryReplId = objectStorePrimaryReplId, + }, _lock = new SingleWriterMultiReaderLock() }; return entry; @@ -325,12 +331,12 @@ public void LogCheckpointEntry(string msg, CheckpointEntry entry) { logger?.LogTrace("{msg} {storeVersion} {storeHlogToken} {storeIndexToken} {objectStoreVersion} {objectStoreHlogToken} {objectStoreIndexToken}", msg, - entry.storeVersion, - entry.storeHlogToken, - entry.storeIndexToken, - entry.objectStoreVersion, - entry.objectStoreHlogToken, - entry.objectStoreIndexToken); + entry.metadata.storeVersion, + entry.metadata.storeHlogToken, + entry.metadata.storeIndexToken, + entry.metadata.objectStoreVersion, + entry.metadata.objectStoreHlogToken, + entry.metadata.objectStoreIndexToken); } } } \ No newline at end of file diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index a297e0b41b..005c12df36 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -75,7 +75,7 @@ public async Task SendCheckpoint() try { logger?.LogInformation("Replica replicaId:{replicaId} requesting checkpoint replicaStoreVersion:{replicaStoreVersion} replicaObjectStoreVersion:{replicaObjectStoreVersion}", - remoteNodeId, remoteEntry.storeVersion, remoteEntry.objectStoreVersion); + remoteNodeId, remoteEntry.metadata.storeVersion, remoteEntry.metadata.objectStoreVersion); gcs.Connect((int)clusterProvider.clusterManager.GetClusterTimeout().TotalMilliseconds); retry: @@ -86,13 +86,13 @@ public async Task SendCheckpoint() // Local and remote checkpoints are of same history if both of the following hold // 1. There is a checkpoint available at remote node // 2. Remote and local checkpoints contain the same PrimaryReplId - var sameMainStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.storePrimaryReplId) && remoteEntry.storePrimaryReplId.Equals(localEntry.storePrimaryReplId); - var sameObjectStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.objectStorePrimaryReplId) && remoteEntry.objectStorePrimaryReplId.Equals(localEntry.objectStorePrimaryReplId); + var sameMainStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.metadata.storePrimaryReplId) && remoteEntry.metadata.storePrimaryReplId.Equals(localEntry.metadata.storePrimaryReplId); + var sameObjectStoreCheckpointHistory = !string.IsNullOrEmpty(remoteEntry.metadata.objectStorePrimaryReplId) && remoteEntry.metadata.objectStorePrimaryReplId.Equals(localEntry.metadata.objectStorePrimaryReplId); // We will not send the latest local checkpoint if any of the following hold // 1. Local node does not have any checkpoints // 2. Local checkpoint is of same version and history as the remote checkpoint - var skipLocalMainStoreCheckpoint = localEntry.storeHlogToken == default || (sameMainStoreCheckpointHistory && localEntry.storeVersion == remoteEntry.storeVersion); - var skipLocalObjectStoreCheckpoint = clusterProvider.serverOptions.DisableObjects || localEntry.objectStoreHlogToken == default || (sameObjectStoreCheckpointHistory && localEntry.objectStoreVersion == remoteEntry.objectStoreVersion); + var skipLocalMainStoreCheckpoint = localEntry.metadata.storeHlogToken == default || (sameMainStoreCheckpointHistory && localEntry.metadata.storeVersion == remoteEntry.metadata.storeVersion); + var skipLocalObjectStoreCheckpoint = clusterProvider.serverOptions.DisableObjects || localEntry.metadata.objectStoreHlogToken == default || (sameObjectStoreCheckpointHistory && localEntry.metadata.objectStoreVersion == remoteEntry.metadata.objectStoreVersion); LogFileInfo hlog_size = default; long index_size = -1; @@ -128,69 +128,69 @@ public async Task SendCheckpoint() if (!skipLocalMainStoreCheckpoint) { - logger?.LogInformation("Sending main store checkpoint {version} {storeHlogToken} {storeIndexToken} to replica", localEntry.storeVersion, localEntry.storeHlogToken, localEntry.storeIndexToken); + logger?.LogInformation("Sending main store checkpoint {version} {storeHlogToken} {storeIndexToken} to replica", localEntry.metadata.storeVersion, localEntry.metadata.storeHlogToken, localEntry.metadata.storeIndexToken); // 1. send hlog file segments if (clusterProvider.serverOptions.EnableStorageTier && hlog_size.hybridLogFileEndAddress > 64) - await SendFileSegments(gcs, localEntry.storeHlogToken, CheckpointFileType.STORE_HLOG, hlog_size.hybridLogFileStartAddress, hlog_size.hybridLogFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.storeHlogToken, CheckpointFileType.STORE_HLOG, hlog_size.hybridLogFileStartAddress, hlog_size.hybridLogFileEndAddress); // 2.Send index file segments //var index_size = storeWrapper.store.GetIndexFileSize(localEntry.storeIndexToken); - await SendFileSegments(gcs, localEntry.storeIndexToken, CheckpointFileType.STORE_INDEX, 0, index_size); + await SendFileSegments(gcs, localEntry.metadata.storeIndexToken, CheckpointFileType.STORE_INDEX, 0, index_size); // 3. Send snapshot file segments - await SendFileSegments(gcs, localEntry.storeHlogToken, CheckpointFileType.STORE_SNAPSHOT, 0, hlog_size.snapshotFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.storeHlogToken, CheckpointFileType.STORE_SNAPSHOT, 0, hlog_size.snapshotFileEndAddress); // 4. Send delta log segments var dlog_size = hlog_size.deltaLogTailAddress; - await SendFileSegments(gcs, localEntry.storeHlogToken, CheckpointFileType.STORE_DLOG, 0, dlog_size); + await SendFileSegments(gcs, localEntry.metadata.storeHlogToken, CheckpointFileType.STORE_DLOG, 0, dlog_size); // 5.Send index metadata - await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_INDEX, localEntry.storeIndexToken); + await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_INDEX, localEntry.metadata.storeIndexToken); // 6. Send snapshot metadata - await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_SNAPSHOT, localEntry.storeHlogToken); + await SendCheckpointMetadata(gcs, storeCkptManager, CheckpointFileType.STORE_SNAPSHOT, localEntry.metadata.storeHlogToken); } if (!skipLocalObjectStoreCheckpoint) { - logger?.LogInformation("Sending object store checkpoint {version} {objectStoreHlogToken} {objectStoreIndexToken} to replica", localEntry.objectStoreVersion, localEntry.objectStoreHlogToken, localEntry.objectStoreIndexToken); + logger?.LogInformation("Sending object store checkpoint {version} {objectStoreHlogToken} {objectStoreIndexToken} to replica", localEntry.metadata.objectStoreVersion, localEntry.metadata.objectStoreHlogToken, localEntry.metadata.objectStoreIndexToken); // 1. send hlog file segments if (clusterProvider.serverOptions.EnableStorageTier && obj_hlog_size.hybridLogFileEndAddress > 24) { //send object hlog file segments - await SendFileSegments(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG, obj_hlog_size.hybridLogFileStartAddress, obj_hlog_size.hybridLogFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG, obj_hlog_size.hybridLogFileStartAddress, obj_hlog_size.hybridLogFileEndAddress); var hlogSegmentCount = ((obj_hlog_size.hybridLogFileEndAddress - obj_hlog_size.hybridLogFileStartAddress) >> clusterProvider.serverOptions.ObjectStoreSegmentSizeBits()) + 1; - await SendObjectFiles(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG_OBJ, (int)hlogSegmentCount); + await SendObjectFiles(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_HLOG_OBJ, (int)hlogSegmentCount); } // 2. Send object store snapshot files if (obj_hlog_size.snapshotFileEndAddress > 24) { //send snapshot file segments - await SendFileSegments(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT, 0, obj_hlog_size.snapshotFileEndAddress); + await SendFileSegments(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT, 0, obj_hlog_size.snapshotFileEndAddress); //send snapshot.obj file segments var snapshotSegmentCount = (obj_hlog_size.snapshotFileEndAddress >> clusterProvider.serverOptions.ObjectStoreSegmentSizeBits()) + 1; - await SendObjectFiles(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT_OBJ, (int)snapshotSegmentCount); + await SendObjectFiles(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_SNAPSHOT_OBJ, (int)snapshotSegmentCount); } // 3. Send object store index file segments if (obj_index_size > 0) - await SendFileSegments(gcs, localEntry.objectStoreIndexToken, CheckpointFileType.OBJ_STORE_INDEX, 0, obj_index_size); + await SendFileSegments(gcs, localEntry.metadata.objectStoreIndexToken, CheckpointFileType.OBJ_STORE_INDEX, 0, obj_index_size); // 4. Send object store delta file segments var obj_dlog_size = obj_hlog_size.deltaLogTailAddress; if (obj_dlog_size > 0) - await SendFileSegments(gcs, localEntry.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_DLOG, 0, obj_dlog_size); + await SendFileSegments(gcs, localEntry.metadata.objectStoreHlogToken, CheckpointFileType.OBJ_STORE_DLOG, 0, obj_dlog_size); // 5. Send object store index metadata - await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_INDEX, localEntry.objectStoreIndexToken); + await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_INDEX, localEntry.metadata.objectStoreIndexToken); // 6. Send object store snapshot metadata - await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_SNAPSHOT, localEntry.objectStoreHlogToken); + await SendCheckpointMetadata(gcs, objectStoreCkptManager, CheckpointFileType.OBJ_STORE_SNAPSHOT, localEntry.metadata.objectStoreHlogToken); } var recoverFromRemote = !skipLocalMainStoreCheckpoint || !skipLocalObjectStoreCheckpoint; diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs index 064fa78c0d..7d5253032e 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs @@ -316,20 +316,17 @@ public long BeginReplicaRecover( logger?.LogInformation("Replica Recover MainStore: {storeVersion}>[{sIndexToken} {sHlogToken}]" + "\nObjectStore: {objectStoreVersion}>[{oIndexToken} {oHlogToken}]", - remoteCheckpoint.storeVersion, - remoteCheckpoint.storeIndexToken, - remoteCheckpoint.storeHlogToken, - remoteCheckpoint.objectStoreVersion, - remoteCheckpoint.objectStoreIndexToken, - remoteCheckpoint.objectStoreHlogToken); + remoteCheckpoint.metadata.storeVersion, + remoteCheckpoint.metadata.storeIndexToken, + remoteCheckpoint.metadata.storeHlogToken, + remoteCheckpoint.metadata.objectStoreVersion, + remoteCheckpoint.metadata.objectStoreIndexToken, + remoteCheckpoint.metadata.objectStoreHlogToken); storeWrapper.RecoverCheckpoint( recoverMainStoreFromToken, recoverObjectStoreFromToken, - remoteCheckpoint.storeIndexToken, - remoteCheckpoint.storeHlogToken, - remoteCheckpoint.objectStoreIndexToken, - remoteCheckpoint.objectStoreHlogToken); + remoteCheckpoint.metadata); if (replayAOF) { @@ -347,15 +344,15 @@ public long BeginReplicaRecover( // If checkpoint for main store was send add its token here in preparation for purge later on if (recoverMainStoreFromToken) { - cEntry.storeIndexToken = remoteCheckpoint.storeIndexToken; - cEntry.storeHlogToken = remoteCheckpoint.storeHlogToken; + cEntry.metadata.storeIndexToken = remoteCheckpoint.metadata.storeIndexToken; + cEntry.metadata.storeHlogToken = remoteCheckpoint.metadata.storeHlogToken; } // If checkpoint for object store was send add its token here in preparation for purge later on if (recoverObjectStoreFromToken) { - cEntry.objectStoreIndexToken = remoteCheckpoint.objectStoreIndexToken; - cEntry.objectStoreHlogToken = remoteCheckpoint.objectStoreHlogToken; + cEntry.metadata.objectStoreIndexToken = remoteCheckpoint.metadata.objectStoreIndexToken; + cEntry.metadata.objectStoreHlogToken = remoteCheckpoint.metadata.objectStoreHlogToken; } checkpointStore.PurgeAllCheckpointsExceptEntry(cEntry); diff --git a/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs b/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs index 9e027f1e91..99f690c512 100644 --- a/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs +++ b/libs/cluster/Server/Replication/ReplicationCheckpointManagement.cs @@ -31,8 +31,8 @@ public bool TryAcquireSettledMetadataForMainStore(CheckpointEntry entry, out Log index_size = -1; try { - hlog_size = storeWrapper.store.GetLogFileSize(entry.storeHlogToken); - index_size = storeWrapper.store.GetIndexFileSize(entry.storeIndexToken); + hlog_size = storeWrapper.store.GetLogFileSize(entry.metadata.storeHlogToken); + index_size = storeWrapper.store.GetIndexFileSize(entry.metadata.storeIndexToken); return true; } catch @@ -54,8 +54,8 @@ public bool TryAcquireSettledMetadataForObjectStore(CheckpointEntry entry, out L index_size = -1; try { - hlog_size = storeWrapper.objectStore.GetLogFileSize(entry.objectStoreHlogToken); - index_size = storeWrapper.objectStore.GetIndexFileSize(entry.objectStoreIndexToken); + hlog_size = storeWrapper.objectStore.GetLogFileSize(entry.metadata.objectStoreHlogToken); + index_size = storeWrapper.objectStore.GetIndexFileSize(entry.metadata.objectStoreIndexToken); return true; } catch diff --git a/libs/server/Cluster/CheckpointMetadata.cs b/libs/server/Cluster/CheckpointMetadata.cs new file mode 100644 index 0000000000..3f10317117 --- /dev/null +++ b/libs/server/Cluster/CheckpointMetadata.cs @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; + +namespace Garnet.server +{ + public sealed class CheckpointMetadata + { + public long storeVersion; + public Guid storeHlogToken; + public Guid storeIndexToken; + public long storeCheckpointCoveredAofAddress; + public string storePrimaryReplId; + + public long objectStoreVersion; + public Guid objectStoreHlogToken; + public Guid objectStoreIndexToken; + public long objectCheckpointCoveredAofAddress; + public string objectStorePrimaryReplId; + + public CheckpointMetadata() + { + storeVersion = -1; + storeHlogToken = default; + storeIndexToken = default; + storeCheckpointCoveredAofAddress = long.MaxValue; + storePrimaryReplId = null; + + objectStoreVersion = -1; + objectStoreHlogToken = default; + objectStoreIndexToken = default; + objectCheckpointCoveredAofAddress = long.MaxValue; + objectStorePrimaryReplId = null; + } + } +} \ No newline at end of file diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index f01aace6f9..ec24359c74 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -237,14 +237,13 @@ internal void Recover() /// /// Caller will have to decide if recover is necessary, so we do not check if recover option is enabled /// - public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, - Guid storeIndexToken = default, Guid storeHlogToken = default, Guid objectStoreIndexToken = default, Guid objectStoreHlogToken = default) + public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, CheckpointMetadata metadata = null) { long storeVersion = -1, objectStoreVersion = -1; try { - storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(storeIndexToken, storeHlogToken); - if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(objectStoreIndexToken, objectStoreHlogToken); + storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(metadata.storeIndexToken, metadata.storeHlogToken); + if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(metadata.objectStoreIndexToken, metadata.objectStoreHlogToken); if (storeVersion > 0 || objectStoreVersion > 0) lastSaveTime = DateTimeOffset.UtcNow; } From a2d2fa78819d822741d5be6ecf6ad4c055e7754e Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 8 Aug 2024 15:14:00 -0700 Subject: [PATCH 06/10] wip --- .../ClusterReplicationTests.cs | 89 +++++++++++++++++++ test/Garnet.test.cluster/ClusterTestUtils.cs | 24 +++++ 2 files changed, 113 insertions(+) diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index 2d5570b2a4..9b71d3f7c5 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -1048,5 +1048,94 @@ public void ClusterReplicateFails() var exc = Assert.Throws(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect)); ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node ")); } + + //[Test, Order(22), Timeout(testTimeout)] + public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) + { + var replica_count = 1;// Per primary + var primary_count = 1; + var nodes_count = primary_count + (primary_count * replica_count); + var primaryNodeIndex = 0; + var replicaNodeIndex = 1; + Assert.IsTrue(primary_count > 0); + context.CreateInstances(nodes_count, disableObjects: false, MainMemoryReplication: true, CommitFrequencyMs: -1, OnDemandCheckpoint: true, enableAOF: true, useTLS: useTLS); + context.CreateConnection(useTLS: useTLS); + _ = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); + + var keyLength = 32; + var kvpairCount = keyCount; + var addCount = 5; + context.kvPairs = []; + + // Populate Primary + if (!performRMW) + context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex); + else + context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex, addCount); + + context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger); + for (var i = 0; i < 5; i++) + { + var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryNodeIndex, logger: context.logger); + var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaNodeIndex, logger: context.logger); + context.clusterTestUtils.Checkpoint(primaryNodeIndex); + context.clusterTestUtils.WaitCheckpoint(primaryNodeIndex, primaryLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitCheckpoint(replicaNodeIndex, replicaLastSaveTime, logger: context.logger); + context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger); + } + + var primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); + var replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); + Assert.AreEqual(6, primaryVersion); + Assert.AreEqual(primaryVersion, replicaVersion); + + // Dispose primary and delete data + context.nodes[primaryNodeIndex].Dispose(true); + // Dispose primary but do not delete data + context.nodes[replicaNodeIndex].Dispose(false); + + // Restart primary and do not recover + context.nodes[primaryNodeIndex] = context.CreateInstance( + context.clusterTestUtils.GetEndPoint(primaryNodeIndex).Port, + disableObjects: true, + tryRecover: false, + enableAOF: true, + MainMemoryReplication: true, + CommitFrequencyMs: -1, + OnDemandCheckpoint: true, + timeout: timeout, + useTLS: useTLS, + cleanClusterConfig: true); + context.nodes[primaryNodeIndex].Start(); + + // Restart secondary and recover + context.nodes[replicaNodeIndex] = context.CreateInstance( + context.clusterTestUtils.GetEndPoint(replicaNodeIndex).Port, + disableObjects: true, + tryRecover: true, + enableAOF: true, + MainMemoryReplication: true, + CommitFrequencyMs: -1, + OnDemandCheckpoint: true, + timeout: timeout, + useTLS: useTLS, + cleanClusterConfig: true); + context.nodes[replicaNodeIndex].Start(); + context.CreateConnection(useTLS: useTLS); + + // Assert primary version is 1 and replica has recovered to previous checkpoint + primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); + replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); + Assert.AreEqual(1, primaryVersion); + Assert.AreEqual(6, replicaVersion); + + Assert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger)); + + context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger); + context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger); + context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); + + context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger); + } } } \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index ffc419f539..c114e7e069 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -2679,6 +2679,30 @@ public string GetFailoverState(IPEndPoint endPoint, ILogger logger = null) return items; } + public string GetInfo(int nodeIndex, string section, string segment, ILogger logger = null) + => GetInfo(endpoints[nodeIndex].ToIPEndPoint(), section, segment, logger); + + public string GetInfo(IPEndPoint endPoint, string section, string segment, ILogger logger = null) + { + try + { + var server = redis.GetServer(endPoint); + var result = server.Info(section); + Assert.AreEqual(1, result.Length, "section does not exist"); + foreach (var item in result[0]) + if (item.Key.Equals(segment)) + return item.Value; + Assert.Fail($"Segment not available for {section} section"); + return ""; + } + catch (Exception ex) + { + logger?.LogError(ex, "An error has occurred; GetFailoverState"); + Assert.Fail(ex.Message); + return null; + } + } + public void WaitForReplicaAofSync(int primaryIndex, int secondaryIndex, ILogger logger = null) { long primaryReplicationOffset; From 59d5c24e421909bedd6c74dee1d071de42819f21 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 12 Aug 2024 14:04:35 -0700 Subject: [PATCH 07/10] add test to validate bug fix --- .../ReplicaOps/ReplicaReceiveCheckpoint.cs | 1 + libs/server/StoreWrapper.cs | 24 +++++++++++++-- .../ClusterReplicationTests.cs | 30 +++++++++++++++---- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs index 7d5253032e..6bb4310b65 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs @@ -324,6 +324,7 @@ public long BeginReplicaRecover( remoteCheckpoint.metadata.objectStoreHlogToken); storeWrapper.RecoverCheckpoint( + replicaRecover: true, recoverMainStoreFromToken, recoverObjectStoreFromToken, remoteCheckpoint.metadata); diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index ec24359c74..c778e955c1 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -237,13 +237,31 @@ internal void Recover() /// /// Caller will have to decide if recover is necessary, so we do not check if recover option is enabled /// - public void RecoverCheckpoint(bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, CheckpointMetadata metadata = null) + public void RecoverCheckpoint(bool replicaRecover = false, bool recoverMainStoreFromToken = false, bool recoverObjectStoreFromToken = false, CheckpointMetadata metadata = null) { long storeVersion = -1, objectStoreVersion = -1; try { - storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(metadata.storeIndexToken, metadata.storeHlogToken); - if (objectStore != null) objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(metadata.objectStoreIndexToken, metadata.objectStoreHlogToken); + if (replicaRecover) + { + if (metadata.storeIndexToken != default && metadata.storeHlogToken != default) + { + storeVersion = !recoverMainStoreFromToken ? store.Recover() : store.Recover(metadata.storeIndexToken, metadata.storeHlogToken); + } + + if (!serverOptions.DisableObjects) + { + if (metadata.objectStoreIndexToken != default && metadata.objectStoreHlogToken != default) + { + objectStoreVersion = !recoverObjectStoreFromToken ? objectStore.Recover() : objectStore.Recover(metadata.objectStoreIndexToken, metadata.objectStoreHlogToken); + } + } + } + else + { + storeVersion = store.Recover(); + if (objectStore != null) objectStoreVersion = objectStore.Recover(); + } if (storeVersion > 0 || objectStoreVersion > 0) lastSaveTime = DateTimeOffset.UtcNow; } diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index 9b71d3f7c5..4856727eb5 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -1049,7 +1049,7 @@ public void ClusterReplicateFails() ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node ")); } - //[Test, Order(22), Timeout(testTimeout)] + [Test, Order(22), Timeout(testTimeout)] public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) { var replica_count = 1;// Per primary @@ -1086,9 +1086,11 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) var primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); var replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); - Assert.AreEqual(6, primaryVersion); + Assert.AreEqual("6", primaryVersion); Assert.AreEqual(primaryVersion, replicaVersion); + context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaNodeIndex); + // Dispose primary and delete data context.nodes[primaryNodeIndex].Dispose(true); // Dispose primary but do not delete data @@ -1126,16 +1128,32 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) // Assert primary version is 1 and replica has recovered to previous checkpoint primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); - Assert.AreEqual(1, primaryVersion); - Assert.AreEqual(6, replicaVersion); + Assert.AreEqual("1", primaryVersion); + Assert.AreEqual("6", replicaVersion); + // Setup cluster Assert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger)); - context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger); context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger); context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); + var primaryNodeId = context.clusterTestUtils.ClusterMyId(primaryNodeIndex, logger: context.logger); + + // Enable replication + context.clusterTestUtils.WaitUntilNodeIdIsKnown(replicaNodeIndex, primaryNodeId, logger: context.logger); + Assert.AreEqual("OK", context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger)); + + // Both nodes are at version 1 despite replica recovering to version earlier + primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); + replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); + Assert.AreEqual("1", primaryVersion); + Assert.AreEqual("1", replicaVersion); - context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger); + // At this point attached replica should be empty because primary did not have any data because it did not recover + foreach (var pair in context.kvPairs) + { + var resp = context.clusterTestUtils.GetKey(replicaNodeIndex, Encoding.ASCII.GetBytes(pair.Key), out _, out _, out _, out var state, logger: context.logger); + Assert.IsNull(resp); + } } } } \ No newline at end of file From f4b68e955df889af39c413c8b2eac740b792a780 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 15 Aug 2024 15:55:28 -0700 Subject: [PATCH 08/10] log message tags --- .../CheckpointManagement/DeviceLogCommitCheckpointManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs index 5286c38bba..90638842ab 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs @@ -428,7 +428,7 @@ private unsafe void IOCallback(uint errorCode, uint numBytes, object context) if (errorCode != 0) { var errorMessage = new Win32Exception((int)errorCode).Message; - logger?.LogError("[DeviceLogCheckpointManager] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage); + logger?.LogError("[DeviceLogManager] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage); } semaphore.Release(); } From bd7f24ba2e8593153908feeefaa000aa55ec4d2a Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Fri, 30 Aug 2024 16:25:55 -0700 Subject: [PATCH 09/10] update nunit legacy methods --- .../ClusterReplicationTests.cs | 22 +++++++++---------- test/Garnet.test.cluster/ClusterTestUtils.cs | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/test/Garnet.test.cluster/ClusterReplicationTests.cs b/test/Garnet.test.cluster/ClusterReplicationTests.cs index 4856727eb5..2f5c67af74 100644 --- a/test/Garnet.test.cluster/ClusterReplicationTests.cs +++ b/test/Garnet.test.cluster/ClusterReplicationTests.cs @@ -1049,7 +1049,7 @@ public void ClusterReplicateFails() ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node ")); } - [Test, Order(22), Timeout(testTimeout)] + [Test, Order(22), CancelAfter(testTimeout)] public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) { var replica_count = 1;// Per primary @@ -1057,7 +1057,7 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) var nodes_count = primary_count + (primary_count * replica_count); var primaryNodeIndex = 0; var replicaNodeIndex = 1; - Assert.IsTrue(primary_count > 0); + ClassicAssert.IsTrue(primary_count > 0); context.CreateInstances(nodes_count, disableObjects: false, MainMemoryReplication: true, CommitFrequencyMs: -1, OnDemandCheckpoint: true, enableAOF: true, useTLS: useTLS); context.CreateConnection(useTLS: useTLS); _ = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); @@ -1086,8 +1086,8 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) var primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); var replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); - Assert.AreEqual("6", primaryVersion); - Assert.AreEqual(primaryVersion, replicaVersion); + ClassicAssert.AreEqual("6", primaryVersion); + ClassicAssert.AreEqual(primaryVersion, replicaVersion); context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaNodeIndex); @@ -1128,11 +1128,11 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) // Assert primary version is 1 and replica has recovered to previous checkpoint primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); - Assert.AreEqual("1", primaryVersion); - Assert.AreEqual("6", replicaVersion); + ClassicAssert.AreEqual("1", primaryVersion); + ClassicAssert.AreEqual("6", replicaVersion); // Setup cluster - Assert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger)); + ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger)); context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger); context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger); context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); @@ -1140,19 +1140,19 @@ public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) // Enable replication context.clusterTestUtils.WaitUntilNodeIdIsKnown(replicaNodeIndex, primaryNodeId, logger: context.logger); - Assert.AreEqual("OK", context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger)); + ClassicAssert.AreEqual("OK", context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger)); // Both nodes are at version 1 despite replica recovering to version earlier primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger); replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger); - Assert.AreEqual("1", primaryVersion); - Assert.AreEqual("1", replicaVersion); + ClassicAssert.AreEqual("1", primaryVersion); + ClassicAssert.AreEqual("1", replicaVersion); // At this point attached replica should be empty because primary did not have any data because it did not recover foreach (var pair in context.kvPairs) { var resp = context.clusterTestUtils.GetKey(replicaNodeIndex, Encoding.ASCII.GetBytes(pair.Key), out _, out _, out _, out var state, logger: context.logger); - Assert.IsNull(resp); + ClassicAssert.IsNull(resp); } } } diff --git a/test/Garnet.test.cluster/ClusterTestUtils.cs b/test/Garnet.test.cluster/ClusterTestUtils.cs index c114e7e069..2653fe5374 100644 --- a/test/Garnet.test.cluster/ClusterTestUtils.cs +++ b/test/Garnet.test.cluster/ClusterTestUtils.cs @@ -2688,7 +2688,7 @@ public string GetInfo(IPEndPoint endPoint, string section, string segment, ILogg { var server = redis.GetServer(endPoint); var result = server.Info(section); - Assert.AreEqual(1, result.Length, "section does not exist"); + ClassicAssert.AreEqual(1, result.Length, "section does not exist"); foreach (var item in result[0]) if (item.Key.Equals(segment)) return item.Value; From eff0f8a41317c0787577a9f3006161f9e2949e93 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 13 Nov 2024 08:27:34 -0800 Subject: [PATCH 10/10] relase-1.0.39 --- .azure/pipelines/azure-pipelines-external-release.yml | 2 +- libs/host/GarnetServer.cs | 2 +- main/GarnetServer/GarnetServer.csproj | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.azure/pipelines/azure-pipelines-external-release.yml b/.azure/pipelines/azure-pipelines-external-release.yml index e83f0a417d..aa6e61c905 100644 --- a/.azure/pipelines/azure-pipelines-external-release.yml +++ b/.azure/pipelines/azure-pipelines-external-release.yml @@ -4,7 +4,7 @@ # 2) update \libs\host\GarnetServer.cs readonly string version (~line 32) -- NOTE - these two values need to be the same # 3) update the version in GarnetServer.csproj (~line 8) ###################################### -name: 1.0.38 +name: 1.0.39 trigger: branches: include: diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index e013ce74f0..dc557fcc67 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -29,7 +29,7 @@ namespace Garnet public class GarnetServer : IDisposable { // IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.csproj line ~8. - readonly string version = "1.0.38"; + readonly string version = "1.0.39"; internal GarnetProvider Provider; diff --git a/main/GarnetServer/GarnetServer.csproj b/main/GarnetServer/GarnetServer.csproj index 5d93032c7b..a29f77ef49 100644 --- a/main/GarnetServer/GarnetServer.csproj +++ b/main/GarnetServer/GarnetServer.csproj @@ -5,7 +5,7 @@ true - 1.0.38 + 1.0.39 garnet-server true garnet-server