From 611619c3652ecd9d19c6ed8eadcf7e633bced0a2 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 13 Nov 2024 13:02:15 -0800 Subject: [PATCH] update aof stream checks --- .../ReplicaOps/ReplicaReplayTask.cs | 10 +---- .../ReplicaOps/ReplicationReplicaAofSync.cs | 44 ++++++++++++------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs index ebf4524e03..4c31e6ac58 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs @@ -50,12 +50,6 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress, { replicaReplayTaskCts.Token.ThrowIfCancellationRequested(); - if (ReplicationOffset != currentAddress) - { - logger?.LogError("ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset); - throw new GarnetException($"ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", LogLevel.Warning, clientResponse: false); - } - ReplicationOffset = currentAddress; var ptr = record; while (ptr < record + recordLength) @@ -85,8 +79,8 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress, if (ReplicationOffset != nextAddress) { - logger?.LogError("ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset); - throw new GarnetException($"ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", LogLevel.Warning, clientResponse: false); + logger?.LogError("ReplicaReplayTask.Consume NextAddress Mismatch recordLength:{recordLength}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; replicationOffset:{ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset); + throw new GarnetException($"ReplicaReplayTask.Consume NextAddress Mismatch recordeLength:{recordLength}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; replicationOffset:{ReplicationOffset}", LogLevel.Warning, clientResponse: false); } } diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs index 8d40759707..db37c8ddb8 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs @@ -3,7 +3,6 @@ using System; using System.Threading; -using System.Threading.Tasks; using Garnet.common; using Microsoft.Extensions.Logging; @@ -64,26 +63,41 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre } } + // Address check + if (currentAddress != storeWrapper.appendOnlyFile.TailAddress) + { + logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress); + logger?.LogError("Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress); + throw new GarnetException($"Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false); + } + // Enqueue to AOF _ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit); - // Throttle to give the opportunity to the background replay task to catch up - ThrottlePrimary(); - - // If background task has not been initialized - // initialize it here and start background replay task - if (replayIterator == null) + if (storeWrapper.serverOptions.ReplicationOffsetMaxLag > -1) { - replayIterator = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle( - previousAddress, - long.MaxValue, - scanUncommitted: true, - recover: false, - logger: logger); + // Throttle to give the opportunity to the background replay task to catch up + ThrottlePrimary(); - Task.Run(ReplicaReplayTask); + // If background task has not been initialized + // initialize it here and start background replay task + if (replayIterator == null) + { + replayIterator = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle( + previousAddress, + long.MaxValue, + scanUncommitted: true, + recover: false, + logger: logger); + + System.Threading.Tasks.Task.Run(ReplicaReplayTask); + } + } + else + { + // Synchronous replay + Consume(record, recordLength, currentAddress, nextAddress, isProtected: false); } - //Consume(record, recordLength, currentAddress, nextAddress, isProtected: false); } catch (Exception ex) {