Skip to content

Commit

Permalink
Improvements in command handling #3 (#526)
Browse files Browse the repository at this point in the history
* partial checkin

* nit

* nit

* idea for initial spike for making input a struct type
does not work

* support ZADD and ZREM using safe struct wrappers for input

* nit

* wip

* wip

* Fixing non-determinism + refactoring HashGet

* dotnet format

* Fixed some API calls

* dotnet format

* wip

* small fix

* Removing unused method

* wip - refactoring ProcessAdminCommands

* Undoing changes to RandomUtils

* Continued refactoring of AdminCommands

* Added "TryGetInt" and "TryGetLong" to parse state api

* dotnet format

* wip

* format

* wip

* wip

* wip

* cleanup

* wip

* wip

* format

* wip

* Fixing build

* cont

* bugfix

* Fixing build

* wip

* wip

* bugfix

* Few small fixes

* format

* bugfix

* Added range validation to SessionParseState read methods

* wip

* merging from latest main

* format

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* clean up

* Fixed object store PERSIST + added missing test

* Cleaned up ObjectInput and added XML comments

* remove unnecessary serialization during upsert

* Addressing comments

---------

Co-authored-by: Badrish Chandramouli <[email protected]>
  • Loading branch information
TalZaccai and badrishc authored Jul 31, 2024
1 parent cab7e2b commit 1f7e6d3
Show file tree
Hide file tree
Showing 66 changed files with 2,753 additions and 2,603 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>, BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>>;
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>, BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions>>;

/// <summary>
/// Cluster manager
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>, BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>>;
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>, BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions>>;

/// <summary>
/// Cluster provider
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Migration/MigrateSessionKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private bool MigrateKeysFromObjectStore()
continue;
var key = mKey.Key.ToArray();

SpanByte input = default;
ObjectInput input = default;
GarnetObjectStoreOutput value = default;
var status = localServerSession.BasicGarnetApi.Read_ObjectStore(ref key, ref input, ref value);
if (status == GarnetStatus.NOTFOUND)
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/ClusterSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>, BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>>;
using BasicGarnetApi = GarnetApi<BasicContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>, BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions>>;

internal sealed unsafe partial class ClusterSession : IClusterSession
{
Expand Down
18 changes: 11 additions & 7 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public sealed unsafe partial class AofProcessor
/// <summary>
/// Session for object store
/// </summary>
readonly BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> objectStoreBasicContext;
readonly BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions> objectStoreBasicContext;

readonly Dictionary<int, List<byte[]>> inflightTxns;
readonly byte[] buffer;
Expand Down Expand Up @@ -292,13 +292,12 @@ static unsafe void StoreDelete(BasicContext<SpanByte, SpanByte, SpanByte, SpanBy
basicContext.Delete(ref key);
}

static unsafe void ObjectStoreUpsert(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, GarnetObjectSerializer garnetObjectSerializer, byte* ptr, byte* outputPtr, int outputLength)
static unsafe void ObjectStoreUpsert(BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, GarnetObjectSerializer garnetObjectSerializer, byte* ptr, byte* outputPtr, int outputLength)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();
ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
ref var value = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize + input.TotalSize);

ref var value = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
var valB = garnetObjectSerializer.Deserialize(value.ToByteArray());

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
Expand All @@ -307,20 +306,25 @@ static unsafe void ObjectStoreUpsert(BasicContext<byte[], IGarnetObject, SpanByt
output.spanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreRMW(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr, byte* outputPtr, int outputLength)
static unsafe void ObjectStoreRMW(BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr, byte* outputPtr, int outputLength)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();

ref var input = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
ref var sbInput = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize);
ref var input = ref Unsafe.AsRef<ObjectInput>(sbInput.ToPointer());

ref var inputPayload = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader) + key.TotalSize + sbInput.TotalSize);
input.payload = new ArgSlice(ref inputPayload);

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new(outputPtr, outputLength) };
if (basicContext.RMW(ref keyB, ref input, ref output).IsPending)
basicContext.CompletePending(true);
if (!output.spanByteAndMemory.IsSpanByte)
output.spanByteAndMemory.Memory.Dispose();
}

static unsafe void ObjectStoreDelete(BasicContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr)
static unsafe void ObjectStoreDelete(BasicContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions> basicContext, byte* ptr)
{
ref var key = ref Unsafe.AsRef<SpanByte>(ptr + sizeof(AofHeader));
var keyB = key.ToByteArray();
Expand Down
10 changes: 5 additions & 5 deletions libs/server/API/GarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Garnet.server
/// </summary>
public partial struct GarnetApi<TContext, TObjectContext> : IGarnetApi, IGarnetWatchApi
where TContext : ITsavoriteContext<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long, MainStoreFunctions>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
readonly StorageSession storageSession;
TContext context;
Expand Down Expand Up @@ -241,11 +241,11 @@ public GarnetStatus Read_MainStore(ref SpanByte key, ref SpanByte input, ref Spa
=> storageSession.Read_MainStore(ref key, ref input, ref output, ref context);

/// <inheritdoc />
public GarnetStatus RMW_ObjectStore(ref byte[] key, ref SpanByte input, ref GarnetObjectStoreOutput output)
public GarnetStatus RMW_ObjectStore(ref byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output)
=> storageSession.RMW_ObjectStore(ref key, ref input, ref output, ref objectContext);

/// <inheritdoc />
public GarnetStatus Read_ObjectStore(ref byte[] key, ref SpanByte input, ref GarnetObjectStoreOutput output)
public GarnetStatus Read_ObjectStore(ref byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output)
=> storageSession.Read_ObjectStore(ref key, ref input, ref output, ref objectContext);
#endregion

Expand Down Expand Up @@ -360,8 +360,8 @@ public ITsavoriteScanIterator<byte[], IGarnetObject> IterateObjectStore()
#region Common Methods

/// <inheritdoc />
public GarnetStatus ObjectScan(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ObjectScan(key, input, ref outputFooter, ref objectContext);
public GarnetStatus ObjectScan(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ObjectScan(key, ref input, ref outputFooter, ref objectContext);

#endregion
}
Expand Down
Loading

0 comments on commit 1f7e6d3

Please sign in to comment.