Skip to content

Commit

Permalink
add SDIFF and SDIFFSTORE. (#200)
Browse files Browse the repository at this point in the history
* add SDIFF.

* strengthen the test

* use  ByteArrayComparer

* add summary

* add SDIFFSTORE command.

* add txn,  when existing same key overwritten of SDIFFSTORE

* add SDIFFSTORE SEClientTest

* Fix whitespace formatting.

* rename -> destination

* edit api doc.

* handle wrongtype.

* meger adjustment

* use ArgSlice ToArray method

* WriteResponse -> WriteDirect
RESP_WRONG_TYPE -> RESP_ERR_WRONG_TYPE

* use WriteError

* add describe.

* add summary

* Remove WRONGTYPE

* Variable renaming and O (n).

* Add "SDIFF" in RespInfo.

* Adjustment.

* formatting

* Fixed keys lock type for SDIFFSTORE

---------

Co-authored-by: Tal Zaccai <[email protected]>
  • Loading branch information
tisilent and TalZaccai authored Apr 17, 2024
1 parent 20c2960 commit 8f86b49
Show file tree
Hide file tree
Showing 14 changed files with 480 additions and 4 deletions.
7 changes: 7 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ public GarnetStatus SetScan(ArgSlice key, long cursor, string match, int count,
public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
=> storageSession.SetUnion(keys, out output, ref objectContext);

/// <inheritdoc />
public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> members)
=> storageSession.SetDiff(keys, out members);

/// <inheritdoc />
public GarnetStatus SetDiffStore(byte[] key, ArgSlice[] keys, out int count)
=> storageSession.SetDiffStore(key, keys, out count);
#endregion

#region Hash Methods
Expand Down
9 changes: 9 additions & 0 deletions libs/server/API/GarnetWatchApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
}


/// <inheritdoc />
public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> output)
{
foreach (var key in keys)
{
garnetApi.WATCH(key, StoreType.Object);
}
return garnetApi.SetDiff(keys, out output);
}
#endregion

#region Hash Methods
Expand Down
16 changes: 16 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,15 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SetRandomMember(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <param name="key">destination</param>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <returns></returns>
public GarnetStatus SetDiffStore(byte[] key, ArgSlice[] keys, out int count);
#endregion

#region List Methods
Expand Down Expand Up @@ -1208,6 +1217,13 @@ public interface IGarnetReadApi
/// <returns></returns>
GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output);

/// <summary>
/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
/// </summary>
/// <param name="keys"></param>
/// <param name="members"></param>
/// <returns></returns>
GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> members);
#endregion

#region Hash Methods
Expand Down
4 changes: 3 additions & 1 deletion libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public enum SetOperation : byte
SRANDMEMBER,
SISMEMBER,
SUNION,
SDIFF,
SDIFFSTORE,
}


Expand Down Expand Up @@ -144,7 +146,7 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
return true;
}

private void UpdateSize(byte[] item, bool add = true)
internal void UpdateSize(byte[] item, bool add = true)
{
var size = Utility.RoundUp(item.Length, IntPtr.Size) + MemoryUtils.ByteArrayOverhead + MemoryUtils.HashSetEntryOverhead;
this.Size += add ? size : -size;
Expand Down
107 changes: 107 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -674,5 +674,112 @@ private unsafe bool SetRandomMember<TGarnetApi>(int count, byte* ptr, ref TGarne
readHead = (int)(ptr - recvBufferPtr);
return true;
}

/// <summary>
/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <returns></returns>
private bool SetDiff<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 1)
{
return AbortWithWrongNumberOfArguments("SDIFF", count);
}

var keys = new ArgSlice[count];
for (var i = 0; i < count; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetDiff(keys, out var output);

if (status == GarnetStatus.OK)
{
if (output == null || output.Count == 0)
{
while (!RespWriteUtils.WriteEmptyArray(ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.WriteArrayLength(output.Count, ref dcurr, dend))
SendAndReset();
foreach (var item in output)
{
while (!RespWriteUtils.WriteBulkString(item, ref dcurr, dend))
SendAndReset();
}
}
}

// Move input head
readHead = (int)(ptr - recvBufferPtr);

return true;
}

private bool SetDiffStore<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 2)
{
return AbortWithWrongNumberOfArguments("SDIFFSTORE", count);
}

// Get the key
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var key, ref ptr, recvBufferPtr + bytesRead))
return false;

if (NetworkSingleKeySlotVerify(key, false))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count))
return false;
return true;
}

var keys = new ArgSlice[count - 1];
for (var i = 0; i < count - 1; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetDiffStore(key, keys, out var output);

if (status == GarnetStatus.OK)
{
while (!RespWriteUtils.WriteInteger(output, ref dcurr, dend))
SendAndReset();
}

// Move input head
readHead = (int)(ptr - recvBufferPtr);

return true;
}
}
}
8 changes: 8 additions & 0 deletions libs/server/Resp/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Set, (byte)SetOperation.SSCAN);
}
else if (*(ulong*)(ptr + 3) == MemoryMarshal.Read<ulong>("\nSDIFF\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SDIFF);
}
break;

case 'W':
Expand Down Expand Up @@ -877,6 +881,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Hash, (byte)HashOperation.HRANDFIELD);
}
else if (*(ulong*)(ptr + 1) == MemoryMarshal.Read<ulong>("10\r\nSDIF"u8) && *(ulong*)(ptr + 9) == MemoryMarshal.Read<ulong>("FSTORE\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SDIFFSTORE);
}
break;

case 11:
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Resp/RespCommandsInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public static RespCommandsInfo findCommand(RespCommand cmd, byte subCmd = 0)
{(byte)SetOperation.SSCAN, new RespCommandsInfo("SSCAN", RespCommand.Set, -2, null, (byte)SetOperation.SSCAN) },
{(byte)SetOperation.SISMEMBER, new RespCommandsInfo("SISMEMBER",RespCommand.Set, 2, null, (byte)SetOperation.SISMEMBER) },
{(byte)SetOperation.SUNION, new RespCommandsInfo("SUNION", RespCommand.Set, -1, null, (byte)SetOperation.SUNION) },
{(byte)SetOperation.SDIFF, new RespCommandsInfo("SDIFF", RespCommand.Set, -1, null, (byte)SetOperation.SDIFF) },
{(byte)SetOperation.SDIFFSTORE, new RespCommandsInfo("SDIFFSTORE", RespCommand.Set, -2, null, (byte)SetOperation.SDIFFSTORE) }
};

private static readonly Dictionary<RespCommand, RespCommandsInfo> customCommandsInfoMap = new Dictionary<RespCommand, RespCommandsInfo>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/RespInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static HashSet<string> GetCommands()
// Pub/sub
"PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE",
// Set
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER", "SUNION",
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER", "SUNION", "SDIFF", "SDIFFSTORE",
//Scan ops
"DBSIZE", "KEYS","SCAN",
// Geospatial commands
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, byte subcmd, int
(RespCommand.Set, (byte)SetOperation.SRANDMEMBER) => SetRandomMember(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SSCAN) => ObjectScan(count, ptr, GarnetObjectType.Set, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SUNION) => SetUnion(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFF) => SetDiff(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFFSTORE) => SetDiffStore(count, ptr, ref storageApi),
_ => ProcessOtherCommands(cmd, subcmd, count, ref storageApi),
};
return success;
Expand Down
132 changes: 132 additions & 0 deletions libs/server/Storage/Session/ObjectStore/SetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -513,5 +513,137 @@ public GarnetStatus SetPop<TObjectContext>(byte[] key, ArgSlice input, ref Garne
public GarnetStatus SetRandomMember<TObjectContext>(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter, ref TObjectContext objectContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
=> RMWObjectStoreOperationWithOutput(key, input, ref objectContext, ref outputFooter);

/// <summary>
/// Returns the members of the set resulting from the difference between the first set at key and all the successive sets at keys.
/// </summary>
/// <param name="keys"></param>
/// <param name="members"></param>
/// <returns></returns>
public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> members)
{
members = default;
if (keys.Length == 0)
return GarnetStatus.OK;

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
foreach (var item in keys)
txnManager.SaveKeyEntryToLock(item, true, LockType.Shared);
_ = txnManager.Run(true);
}

// SetObject
var setObjectStoreLockableContext = txnManager.ObjectStoreLockableContext;

try
{
members = SetDiff(keys, ref setObjectStoreLockableContext);
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}

return GarnetStatus.OK;
}

/// <summary>
/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <param name="key">destination</param>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <returns></returns>
public GarnetStatus SetDiffStore(byte[] key, ArgSlice[] keys, out int count)
{
count = default;

if (key.Length == 0 || keys.Length == 0)
return GarnetStatus.OK;

var destination = scratchBufferManager.CreateArgSlice(key);

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
txnManager.SaveKeyEntryToLock(destination, true, LockType.Exclusive);
foreach (var item in keys)
txnManager.SaveKeyEntryToLock(item, true, LockType.Shared);
_ = txnManager.Run(true);
}

// SetObject
var setObjectStoreLockableContext = txnManager.ObjectStoreLockableContext;

try
{
var diffSet = SetDiff(keys, ref setObjectStoreLockableContext);

var newSetObject = new SetObject();
foreach (var item in diffSet)
{
_ = newSetObject.Set.Add(item);
newSetObject.UpdateSize(item);
}
_ = SET(key, newSetObject, ref setObjectStoreLockableContext);
count = diffSet.Count;
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}

return GarnetStatus.OK;
}

private HashSet<byte[]> SetDiff<TObjectContext>(ArgSlice[] keys, ref TObjectContext objectContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
var result = new HashSet<byte[]>();
if (keys.Length == 0)
{
return result;
}

// first SetObject
var status = GET(keys[0].ToArray(), out var first, ref objectContext);
if (status == GarnetStatus.OK)
{
if (first.garnetObject is SetObject firstObject)
{
result = new HashSet<byte[]>(firstObject.Set, new ByteArrayComparer());
}
}
else
{
return result;
}

// after SetObjects
for (var i = 1; i < keys.Length; i++)
{
status = GET(keys[i].ToArray(), out var next, ref objectContext);
if (status == GarnetStatus.OK)
{
if (next.garnetObject is SetObject nextObject)
{
result.ExceptWith(nextObject.Set);
}
}
}

return result;
}
}
}
Loading

0 comments on commit 8f86b49

Please sign in to comment.