Skip to content

Commit

Permalink
[Compatibility] Added ZDIFFSTORE command (#732)
Browse files Browse the repository at this point in the history
* Added ZDIFFSTORE command

* Tied to fix ClusterSlotVerificationTests

* Review comments

* Review comment fix

* Review command fix

---------

Co-authored-by: Tal Zaccai <[email protected]>
  • Loading branch information
Vijay-Nirmal and TalZaccai authored Oct 28, 2024
1 parent bda4f3c commit 4e3b462
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 32 deletions.
30 changes: 30 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -5310,6 +5310,36 @@
}
]
},
{
"Command": "ZDIFFSTORE",
"Name": "ZDIFFSTORE",
"Summary": "Stores the difference of multiple sorted sets in a key.",
"Group": "SortedSet",
"Complexity": "O(L \u002B (N-K)log(N)) worst case where L is the total number of elements in all the sets, N is the size of the first set, and K is the size of the result set.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "DESTINATION",
"DisplayText": "destination",
"Type": "Key",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "NUMKEYS",
"DisplayText": "numkeys",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "KEY",
"DisplayText": "key",
"Type": "Key",
"ArgumentFlags": "Multiple",
"KeySpecIndex": 1
}
]
},
{
"Command": "ZINCRBY",
"Name": "ZINCRBY",
Expand Down
38 changes: 38 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -4056,6 +4056,44 @@
}
]
},
{
"Command": "ZDIFFSTORE",
"Name": "ZDIFFSTORE",
"Arity": -4,
"Flags": "DenyOom, MovableKeys, Write",
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
"AclCategories": "SortedSet, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Flags": "OW, Update"
},
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysKeyNum",
"KeyNumIdx": 0,
"FirstKey": 1,
"KeyStep": 1
},
"Flags": "RO, Access"
}
]
},
{
"Command": "ZINCRBY",
"Name": "ZINCRBY",
Expand Down
5 changes: 5 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using Tsavorite.core;

Expand Down Expand Up @@ -129,6 +130,10 @@ public GarnetStatus SortedSetRange(ArgSlice key, ArgSlice min, ArgSlice max, Sor
public GarnetStatus SortedSetDifference(ArgSlice[] keys, out Dictionary<byte[], double> pairs)
=> storageSession.SortedSetDifference(keys, out pairs);

/// <inheritdoc />
public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySpan<ArgSlice> keys, out int count)
=> storageSession.SortedSetDifferenceStore(destinationKey, keys, out count);

/// <inheritdoc />
public GarnetStatus SortedSetScan(ArgSlice key, long cursor, string match, int count, out ArgSlice[] items)
=> storageSession.ObjectScan(GarnetObjectType.SortedSet, key, cursor, match, count, out items, ref objectContext);
Expand Down
9 changes: 9 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,15 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SortedSetRemoveRangeByRank(ArgSlice key, int start, int stop, out int countRemoved);

/// <summary>
/// Computes the difference between the first and all successive sorted sets and store resulting pairs in the output key.
/// </summary>
/// <param name="keys"></param>
/// <param name="destinationKey"></param>
/// <param name="count"></param>
/// <returns></returns>
GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySpan<ArgSlice> keys, out int count);

/// <summary>
/// Adds geospatial items (longitude, latitude, name) to the specified key.
/// </summary>
Expand Down
53 changes: 52 additions & 1 deletion libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ private unsafe bool SortedSetDifference<TGarnetApi>(ref TGarnetApi storageApi)
return AbortWithWrongNumberOfArguments("ZDIFF");
}

//number of keys
// Number of keys
if (!parseState.TryGetInt(0, out var nKeys))
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref dcurr, dend))
Expand Down Expand Up @@ -827,5 +827,56 @@ private unsafe bool SortedSetDifference<TGarnetApi>(ref TGarnetApi storageApi)

return true;
}

/// <summary>
/// Computes a difference operation between the first and all successive sorted sets and store
/// and returns the result to the client.
/// The total number of input keys is specified.
/// </summary>
/// <param name="storageApi"></param>
/// <returns></returns>
/// <exception cref="GarnetException"></exception>
private unsafe bool SortedSetDifferenceStore<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (parseState.Count < 3)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.ZDIFFSTORE));
}

// Number of keys
if (!parseState.TryGetInt(1, out var nKeys))
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref dcurr, dend))
SendAndReset();
return true;
}

if (parseState.Count - 2 != nKeys)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_SYNTAX_ERROR, ref dcurr, dend))
SendAndReset();
return true;
}

var destination = parseState.GetArgSliceByRef(0);
var keys = parseState.Parameters.Slice(2, nKeys);

var status = storageApi.SortedSetDifferenceStore(destination, keys, out var count);

switch (status)
{
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
default:
while (!RespWriteUtils.WriteInteger(count, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}
}
}
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public enum RespCommand : ushort
SUNIONSTORE,
UNLINK,
ZADD,
ZDIFFSTORE,
ZINCRBY,
ZPOPMAX,
ZPOPMIN,
Expand Down Expand Up @@ -1355,6 +1356,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.SMISMEMBER;
}
else if (*(ulong*)(ptr + 1) == MemoryMarshal.Read<ulong>("10\r\nZDIF"u8) && *(uint*)(ptr + 9) == MemoryMarshal.Read<uint>("FSTORE\r\n"u8))
{
return RespCommand.ZDIFFSTORE;
}
break;

case 11:
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.ZPOPMIN => SortedSetPop(cmd, ref storageApi),
RespCommand.ZRANDMEMBER => SortedSetRandomMember(ref storageApi),
RespCommand.ZDIFF => SortedSetDifference(ref storageApi),
RespCommand.ZDIFFSTORE => SortedSetDifferenceStore(ref storageApi),
RespCommand.ZREVRANGE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZREVRANGEBYSCORE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZSCAN => ObjectScan(GarnetObjectType.SortedSet, ref storageApi),
Expand Down
131 changes: 101 additions & 30 deletions libs/server/Storage/Session/ObjectStore/SortedSetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ public unsafe GarnetStatus SortedSetRange<TObjectContext>(ArgSlice key, ArgSlice
/// <param name="keys"></param>
/// <param name="pairs"></param>
/// <returns></returns>
public unsafe GarnetStatus SortedSetDifference(ArgSlice[] keys, out Dictionary<byte[], double> pairs)
public unsafe GarnetStatus SortedSetDifference(ReadOnlySpan<ArgSlice> keys, out Dictionary<byte[], double> pairs)
{
pairs = default;

Expand All @@ -561,47 +561,76 @@ public unsafe GarnetStatus SortedSetDifference(ArgSlice[] keys, out Dictionary<b

try
{
var statusOp = GET(keys[0].ToArray(), out var firstObj, ref objectContext);
if (statusOp == GarnetStatus.OK)
return SortedSetDifference(keys, ref objectContext, out pairs);
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}
}

/// <summary>
/// Computes the difference between the first and all successive sorted sets and store resulting pairs in the destination key.
/// </summary>
/// <param name="keys"></param>
/// <param name="destinationKey"></param>
/// <param name="count"></param>
/// <returns></returns>
public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySpan<ArgSlice> keys, out int count)
{
count = default;

if (keys.Length == 0)
return GarnetStatus.OK;

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
txnManager.SaveKeyEntryToLock(destinationKey, true, LockType.Exclusive);
foreach (var item in keys)
txnManager.SaveKeyEntryToLock(item, true, LockType.Shared);
_ = txnManager.Run(true);
}

var objectContext = txnManager.ObjectStoreLockableContext;

try
{
var status = SortedSetDifference(keys, ref objectContext, out var pairs);

if (status != GarnetStatus.OK)
{
if (firstObj.garnetObject is not SortedSetObject firstSortedSet)
{
return GarnetStatus.WRONGTYPE;
}
return GarnetStatus.WRONGTYPE;
}

if (keys.Length == 1)
{
pairs = firstSortedSet.Dictionary;
return GarnetStatus.OK;
}
count = pairs?.Count ?? 0;

// read the rest of the keys
for (var item = 1; item < keys.Length; item++)
if (count > 0)
{
SortedSetObject newSetObject = new();
foreach (var (element, score) in pairs)
{
statusOp = GET(keys[item].ToArray(), out var nextObj, ref objectContext);
if (statusOp != GarnetStatus.OK)
continue;

if (nextObj.garnetObject is not SortedSetObject nextSortedSet)
{
pairs = default;
return GarnetStatus.WRONGTYPE;
}

if (pairs == default)
pairs = SortedSetObject.CopyDiff(firstSortedSet.Dictionary, nextSortedSet.Dictionary);
else
SortedSetObject.InPlaceDiff(pairs, nextSortedSet.Dictionary);
newSetObject.Add(element, score);
}
_ = SET(destinationKey.ToArray(), newSetObject, ref objectContext);
}
else
{
_ = EXPIRE(destinationKey, TimeSpan.Zero, out _, StoreType.Object, ExpireOption.None,
ref lockableContext, ref objectContext);
}

return status;
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}

return GarnetStatus.OK;
}

/// <summary>
Expand Down Expand Up @@ -856,5 +885,47 @@ public GarnetStatus SortedSetRandomMember<TObjectContext>(byte[] key, ref Object
public GarnetStatus SortedSetScan<TObjectContext>(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
=> ReadObjectStoreOperationWithOutput(key, ref input, ref objectStoreContext, ref outputFooter);

private GarnetStatus SortedSetDifference<TObjectContext>(ReadOnlySpan<ArgSlice> keys, ref TObjectContext objectContext, out Dictionary<byte[], double> pairs)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
{
pairs = default;

var statusOp = GET(keys[0].ToArray(), out var firstObj, ref objectContext);
if (statusOp == GarnetStatus.OK)
{
if (firstObj.garnetObject is not SortedSetObject firstSortedSet)
{
return GarnetStatus.WRONGTYPE;
}

if (keys.Length == 1)
{
pairs = firstSortedSet.Dictionary;
return GarnetStatus.OK;
}

// read the rest of the keys
for (var item = 1; item < keys.Length; item++)
{
statusOp = GET(keys[item].ToArray(), out var nextObj, ref objectContext);
if (statusOp != GarnetStatus.OK)
continue;

if (nextObj.garnetObject is not SortedSetObject nextSortedSet)
{
pairs = default;
return GarnetStatus.WRONGTYPE;
}

if (pairs == default)
pairs = SortedSetObject.CopyDiff(firstSortedSet.Dictionary, nextSortedSet.Dictionary);
else
SortedSetObject.InPlaceDiff(pairs, nextSortedSet.Dictionary);
}
}

return GarnetStatus.OK;
}
}
}
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public class SupportedCommand
new("ZCARD", RespCommand.ZCARD),
new("ZCOUNT", RespCommand.ZCOUNT),
new("ZDIFF", RespCommand.ZDIFF),
new("ZDIFFSTORE", RespCommand.ZDIFFSTORE),
new("ZINCRBY", RespCommand.ZINCRBY),
new("ZLEXCOUNT", RespCommand.ZLEXCOUNT),
new("ZMSCORE", RespCommand.ZMSCORE),
Expand Down
Loading

0 comments on commit 4e3b462

Please sign in to comment.