Skip to content

Commit

Permalink
update aof stream checks
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Nov 13, 2024
1 parent c216e21 commit 611619c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
10 changes: 2 additions & 8 deletions libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress,
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();

if (ReplicationOffset != currentAddress)
{
logger?.LogError("ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset);
throw new GarnetException($"ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", LogLevel.Warning, clientResponse: false);
}

ReplicationOffset = currentAddress;
var ptr = record;
while (ptr < record + recordLength)
Expand Down Expand Up @@ -85,8 +79,8 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress,

if (ReplicationOffset != nextAddress)
{
logger?.LogError("ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset);
throw new GarnetException($"ReplicaReplayTask.Consume Begin Address Mismatch {recordLength}; {currentAddress}; {nextAddress}; {ReplicationOffset}", LogLevel.Warning, clientResponse: false);
logger?.LogError("ReplicaReplayTask.Consume NextAddress Mismatch recordLength:{recordLength}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; replicationOffset:{ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset);
throw new GarnetException($"ReplicaReplayTask.Consume NextAddress Mismatch recordeLength:{recordLength}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; replicationOffset:{ReplicationOffset}", LogLevel.Warning, clientResponse: false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -64,26 +63,41 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
}
}

// Address check
if (currentAddress != storeWrapper.appendOnlyFile.TailAddress)
{
logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress);
logger?.LogError("Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
throw new GarnetException($"Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false);
}

// Enqueue to AOF
_ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span<byte>(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit);

// Throttle to give the opportunity to the background replay task to catch up
ThrottlePrimary();

// If background task has not been initialized
// initialize it here and start background replay task
if (replayIterator == null)
if (storeWrapper.serverOptions.ReplicationOffsetMaxLag > -1)
{
replayIterator = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(
previousAddress,
long.MaxValue,
scanUncommitted: true,
recover: false,
logger: logger);
// Throttle to give the opportunity to the background replay task to catch up
ThrottlePrimary();

Task.Run(ReplicaReplayTask);
// If background task has not been initialized
// initialize it here and start background replay task
if (replayIterator == null)
{
replayIterator = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(
previousAddress,
long.MaxValue,
scanUncommitted: true,
recover: false,
logger: logger);

System.Threading.Tasks.Task.Run(ReplicaReplayTask);
}
}
else
{
// Synchronous replay
Consume(record, recordLength, currentAddress, nextAddress, isProtected: false);
}
//Consume(record, recordLength, currentAddress, nextAddress, isProtected: false);
}
catch (Exception ex)
{
Expand Down

0 comments on commit 611619c

Please sign in to comment.