Skip to content

Commit

Permalink
AOF header addition - v2 (#788)
Browse files Browse the repository at this point in the history
* AOF header addition - v2

* nit

* undo inadvertent changes

* sigh
  • Loading branch information
badrishc authored Nov 12, 2024
1 parent fdf7852 commit 225a9ea
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 36 deletions.
53 changes: 32 additions & 21 deletions libs/server/AOF/AofHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,49 @@

namespace Garnet.server
{
[StructLayout(LayoutKind.Explicit, Size = 15)]
[StructLayout(LayoutKind.Explicit, Size = 16)]
struct AofHeader
{
// UPDATE THIS VERSION WHENEVER THE LAYOUT OF THIS STRUCT CHANGES
const byte AOF_HEADER_VERSION = 1;
// Important: Update version number whenever any of the following change:
// * Layout, size, contents of this struct
// * Any of the AofEntryType or AofStoreType enums' existing value mappings
// * SpanByte format or header
const byte AofHeaderVersion = 1;

/// <summary>
/// Version of AOF
/// </summary>
[FieldOffset(0)]
public byte aofHeaderVersion;
/// <summary>
/// Padding, for alignment and future use
/// </summary>
[FieldOffset(1)]
public AofEntryType opType;
public byte padding;
/// <summary>
/// Type of operation
/// </summary>
[FieldOffset(2)]
public byte type;
public AofEntryType opType;
/// <summary>
/// Procedure ID
/// </summary>
[FieldOffset(3)]
public long version;
[FieldOffset(11)]
public byte procedureId;
/// <summary>
/// Store version
/// </summary>
[FieldOffset(4)]
public long storeVersion;
/// <summary>
/// Session ID
/// </summary>
[FieldOffset(12)]
public int sessionID;

public AofHeader(AofEntryType opType, byte type, long version, int sessionID)
{
this.aofHeaderVersion = AOF_HEADER_VERSION;
this.opType = opType;
this.type = type;
this.version = version;
this.sessionID = sessionID;
}

public AofHeader(AofEntryType opType, long version, int sessionID)
public AofHeader()
{
this.aofHeaderVersion = AOF_HEADER_VERSION;
this.opType = opType;
this.version = version;
this.sessionID = sessionID;
this.aofHeaderVersion = AofHeaderVersion;
}
}
}
10 changes: 5 additions & 5 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,14 @@ public unsafe void ProcessAofRecordInternal(byte* ptr, int length, bool asReplic
case AofEntryType.MainStoreCheckpointCommit:
if (asReplica)
{
if (header.version > storeWrapper.store.CurrentVersion)
if (header.storeVersion > storeWrapper.store.CurrentVersion)
storeWrapper.TakeCheckpoint(false, StoreType.Main, logger);
}
break;
case AofEntryType.ObjectStoreCheckpointCommit:
if (asReplica)
{
if (header.version > storeWrapper.objectStore.CurrentVersion)
if (header.storeVersion > storeWrapper.objectStore.CurrentVersion)
storeWrapper.TakeCheckpoint(false, StoreType.Object, logger);
}
break;
Expand Down Expand Up @@ -265,7 +265,7 @@ private unsafe bool ReplayOp(byte* entryPtr)
ObjectStoreDelete(objectStoreBasicContext, entryPtr);
break;
case AofEntryType.StoredProcedure:
RunStoredProc(header.type, customProcInput, entryPtr);
RunStoredProc(header.procedureId, customProcInput, entryPtr);
break;
default:
throw new GarnetException($"Unknown AOF header operation type {header.opType}");
Expand Down Expand Up @@ -388,8 +388,8 @@ bool SkipRecord(AofHeader header)

return storeType switch
{
AofStoreType.MainStoreType => header.version <= storeWrapper.store.CurrentVersion - 1,
AofStoreType.ObjectStoreType => header.version <= storeWrapper.objectStore.CurrentVersion - 1,
AofStoreType.MainStoreType => header.storeVersion <= storeWrapper.store.CurrentVersion - 1,
AofStoreType.ObjectStoreType => header.storeVersion <= storeWrapper.objectStore.CurrentVersion - 1,
AofStoreType.TxnType => false,
AofStoreType.ReplicationType => false,
AofStoreType.CheckpointType => false,
Expand Down
6 changes: 3 additions & 3 deletions libs/server/Storage/Functions/MainStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ void WriteLogUpsert(ref SpanByte key, ref RawStringInput input, ref SpanByte val
input.header.flags |= RespInputFlags.Deterministic;

functionsState.appendOnlyFile.Enqueue(
new AofHeader(opType: AofEntryType.StoreUpsert, version: version, sessionID: sessionId),
new AofHeader { opType = AofEntryType.StoreUpsert, storeVersion = version, sessionID = sessionId },
ref key, ref value, ref input, out _);
}

Expand All @@ -692,7 +692,7 @@ void WriteLogRMW(ref SpanByte key, ref RawStringInput input, long version, int s
input.header.flags |= RespInputFlags.Deterministic;

functionsState.appendOnlyFile.Enqueue(
new AofHeader(opType: AofEntryType.StoreRMW, version: version, sessionID: sessionId),
new AofHeader { opType = AofEntryType.StoreRMW, storeVersion = version, sessionID = sessionId },
ref key, ref input, out _);
}

Expand All @@ -705,7 +705,7 @@ void WriteLogDelete(ref SpanByte key, long version, int sessionID)
{
if (functionsState.StoredProcMode) return;
SpanByte def = default;
functionsState.appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.StoreDelete, version: version, sessionID: sessionID), ref key, ref def, out _);
functionsState.appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.StoreDelete, storeVersion = version, sessionID = sessionID }, ref key, ref def, out _);
}

BitFieldCmdArgs GetBitFieldArguments(ref RawStringInput input)
Expand Down
6 changes: 3 additions & 3 deletions libs/server/Storage/Functions/ObjectStore/PrivateMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void WriteLogUpsert(ref byte[] key, ref ObjectInput input, ref IGarnetObject val
var valSB = SpanByte.FromPinnedPointer(valPtr, valueBytes.Length);

functionsState.appendOnlyFile.Enqueue(
new AofHeader(opType: AofEntryType.ObjectStoreUpsert, version: version, sessionID: sessionID),
new AofHeader { opType = AofEntryType.ObjectStoreUpsert, storeVersion = version, sessionID = sessionID },
ref keySB, ref valSB, out _);
}
}
Expand All @@ -57,7 +57,7 @@ void WriteLogRMW(ref byte[] key, ref ObjectInput input, long version, int sessio
var sbKey = SpanByte.FromPinnedPointer(keyPtr, key.Length);

functionsState.appendOnlyFile.Enqueue(
new AofHeader(opType: AofEntryType.ObjectStoreRMW, version: version, sessionID: sessionID),
new AofHeader { opType = AofEntryType.ObjectStoreRMW, storeVersion = version, sessionID = sessionID },
ref sbKey, ref input, out _);
}
}
Expand All @@ -75,7 +75,7 @@ void WriteLogDelete(ref byte[] key, long version, int sessionID)
var keySB = SpanByte.FromPinnedPointer(ptr, key.Length);
SpanByte valSB = default;

functionsState.appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.ObjectStoreDelete, version: version, sessionID: sessionID), ref keySB, ref valSB, out _);
functionsState.appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.ObjectStoreDelete, storeVersion = version, sessionID = sessionID }, ref keySB, ref valSB, out _);
}
}

Expand Down
2 changes: 1 addition & 1 deletion libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public void EnqueueCommit(bool isMainStore, long version)
AofHeader header = new()
{
opType = isMainStore ? AofEntryType.MainStoreCheckpointCommit : AofEntryType.ObjectStoreCheckpointCommit,
version = version,
storeVersion = version,
sessionID = -1
};
appendOnlyFile?.Enqueue(header, out _);
Expand Down
6 changes: 3 additions & 3 deletions libs/server/Transaction/TransactionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ internal void Log(byte id, ref CustomProcedureInput procInput)
{
Debug.Assert(functionsState.StoredProcMode);

appendOnlyFile?.Enqueue(new AofHeader(opType: AofEntryType.StoredProcedure, type: id, version: basicContext.Session.Version, sessionID: basicContext.Session.ID), ref procInput, out _);
appendOnlyFile?.Enqueue(new AofHeader { opType = AofEntryType.StoredProcedure, procedureId = id, storeVersion = basicContext.Session.Version, sessionID = basicContext.Session.ID }, ref procInput, out _);
}

internal void Commit(bool internal_txn = false)
{
if (appendOnlyFile != null && !functionsState.StoredProcMode)
{
appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.TxnCommit, version: basicContext.Session.Version, sessionID: basicContext.Session.ID), out _);
appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.TxnCommit, storeVersion = basicContext.Session.Version, sessionID = basicContext.Session.ID }, out _);
}
if (!internal_txn)
watchContainer.Reset();
Expand Down Expand Up @@ -335,7 +335,7 @@ internal bool Run(bool internal_txn = false, bool fail_fast_on_lock = false, Tim

if (appendOnlyFile != null && !functionsState.StoredProcMode)
{
appendOnlyFile.Enqueue(new AofHeader(opType: AofEntryType.TxnStart, version: basicContext.Session.Version, sessionID: basicContext.Session.ID), out _);
appendOnlyFile.Enqueue(new AofHeader { opType = AofEntryType.TxnStart, storeVersion = basicContext.Session.Version, sessionID = basicContext.Session.ID }, out _);
}

state = TxnState.Running;
Expand Down
1 change: 1 addition & 0 deletions libs/storage/Tsavorite/cs/src/core/VarLen/SpanByte.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Tsavorite.core
{
/// <summary>
/// Represents a pinned variable length byte array that is viewable as a pinned Span&lt;byte&gt;
/// Important: AOF header version needs to be updated if this struct's disk representation changes
/// </summary>
/// <remarks>
/// Format: [4-byte (int) length of payload][[optional 8-byte metadata] payload bytes...]
Expand Down

0 comments on commit 225a9ea

Please sign in to comment.