Skip to content

Commit

Permalink
Add SUNIONSTORE command. (#291)
Browse files Browse the repository at this point in the history
* Add SUNIONSTORE command.

* doc

* Handle empty key.

---------

Co-authored-by: Tal Zaccai <[email protected]>
  • Loading branch information
tisilent and TalZaccai authored Apr 23, 2024
1 parent 745db47 commit a4dc14f
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 25 deletions.
6 changes: 5 additions & 1 deletion libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,11 @@ public GarnetStatus SetMove(ArgSlice sourceKey, ArgSlice destinationKey, ArgSlic
=> storageSession.SetMove(sourceKey, destinationKey, member, out smoveResult);

public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
=> storageSession.SetUnion(keys, out output, ref objectContext);
=> storageSession.SetUnion(keys, out output);

/// <inheritdoc />
public GarnetStatus SetUnionStore(byte[] key, ArgSlice[] keys, out int count)
=> storageSession.SetUnionStore(key, keys, out count);

/// <inheritdoc />
public GarnetStatus SetDiff(ArgSlice[] keys, out HashSet<byte[]> members)
Expand Down
10 changes: 10 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SetRandomMember(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <param name="key"></param>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <returns></returns>
GarnetStatus SetUnionStore(byte[] key, ArgSlice[] keys, out int count);

/// <summary>
/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
Expand Down
1 change: 1 addition & 0 deletions libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum SetOperation : byte
SRANDMEMBER,
SISMEMBER,
SUNION,
SUNIONSTORE,
SDIFF,
SDIFFSTORE,
}
Expand Down
58 changes: 58 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,64 @@ private bool SetUnion<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageAp
return true;
}

/// <summary>
/// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <typeparam name="TGarnetApi"></typeparam>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <returns></returns>
private bool SetUnionStore<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 2)
{
return AbortWithWrongNumberOfArguments("SUNIONSTORE", count);
}

// Get the key
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var key, ref ptr, recvBufferPtr + bytesRead))
return false;

if (NetworkSingleKeySlotVerify(key, false))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count))
return false;
return true;
}

var keys = new ArgSlice[count - 1];
for (var i = 0; i < count - 1; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

var status = storageApi.SetUnionStore(key, keys, out var output);

if (status == GarnetStatus.OK)
{
while (!RespWriteUtils.WriteInteger(output, ref dcurr, dend))
SendAndReset();
}

// Move input head
readHead = (int)(ptr - recvBufferPtr);

return true;
}

/// <summary>
/// Remove the specified members from the set.
/// Specified members that are not a member of this set are ignored.
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Resp/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.Set, (byte)SetOperation.SRANDMEMBER);
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("1\r\nSUNIO"u8) && *(ulong*)(ptr + 10) == MemoryMarshal.Read<ulong>("NSTORE\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SUNIONSTORE);
}
break;

case 12:
Expand Down
25 changes: 13 additions & 12 deletions libs/server/Resp/RespCommandsInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,19 @@ public static RespCommandsInfo findCommand(RespCommand cmd, byte subCmd = 0)

private static readonly Dictionary<byte, RespCommandsInfo> setCommandsInfoMap = new Dictionary<byte, RespCommandsInfo>
{
{(byte)SetOperation.SADD, new RespCommandsInfo("SADD", RespCommand.Set, -2, null, (byte)SetOperation.SADD)},
{(byte)SetOperation.SMEMBERS, new RespCommandsInfo("SMEMBERS", RespCommand.Set, 1, null, (byte)SetOperation.SMEMBERS)},
{(byte)SetOperation.SREM, new RespCommandsInfo("SREM", RespCommand.Set, -2, null, (byte)SetOperation.SREM)},
{(byte)SetOperation.SCARD, new RespCommandsInfo("SCARD", RespCommand.Set, 1, null, (byte)SetOperation.SCARD)},
{(byte)SetOperation.SRANDMEMBER,new RespCommandsInfo("SRANDMEMBER", RespCommand.Set, -2, null, (byte)SetOperation.SRANDMEMBER)},
{(byte)SetOperation.SPOP, new RespCommandsInfo("SPOP", RespCommand.Set, -1, null, (byte)SetOperation.SPOP) },
{(byte)SetOperation.SSCAN, new RespCommandsInfo("SSCAN", RespCommand.Set, -2, null, (byte)SetOperation.SSCAN) },
{(byte)SetOperation.SMOVE, new RespCommandsInfo("SMOVE", RespCommand.Set, 3, null, (byte)SetOperation.SMOVE) },
{(byte)SetOperation.SISMEMBER, new RespCommandsInfo("SISMEMBER",RespCommand.Set, 2, null, (byte)SetOperation.SISMEMBER) },
{(byte)SetOperation.SUNION, new RespCommandsInfo("SUNION", RespCommand.Set, -1, null, (byte)SetOperation.SUNION) },
{(byte)SetOperation.SDIFF, new RespCommandsInfo("SDIFF", RespCommand.Set, -1, null, (byte)SetOperation.SDIFF) },
{(byte)SetOperation.SDIFFSTORE, new RespCommandsInfo("SDIFFSTORE", RespCommand.Set, -2, null, (byte)SetOperation.SDIFFSTORE) }
{(byte)SetOperation.SADD, new RespCommandsInfo("SADD", RespCommand.Set, -2, null, (byte)SetOperation.SADD)},
{(byte)SetOperation.SMEMBERS, new RespCommandsInfo("SMEMBERS", RespCommand.Set, 1, null, (byte)SetOperation.SMEMBERS)},
{(byte)SetOperation.SREM, new RespCommandsInfo("SREM", RespCommand.Set, -2, null, (byte)SetOperation.SREM)},
{(byte)SetOperation.SCARD, new RespCommandsInfo("SCARD", RespCommand.Set, 1, null, (byte)SetOperation.SCARD)},
{(byte)SetOperation.SRANDMEMBER, new RespCommandsInfo("SRANDMEMBER", RespCommand.Set, -2, null, (byte)SetOperation.SRANDMEMBER)},
{(byte)SetOperation.SPOP, new RespCommandsInfo("SPOP", RespCommand.Set, -1, null, (byte)SetOperation.SPOP) },
{(byte)SetOperation.SSCAN, new RespCommandsInfo("SSCAN", RespCommand.Set, -2, null, (byte)SetOperation.SSCAN) },
{(byte)SetOperation.SMOVE, new RespCommandsInfo("SMOVE", RespCommand.Set, 3, null, (byte)SetOperation.SMOVE) },
{(byte)SetOperation.SISMEMBER, new RespCommandsInfo("SISMEMBER", RespCommand.Set, 2, null, (byte)SetOperation.SISMEMBER) },
{(byte)SetOperation.SUNION, new RespCommandsInfo("SUNION", RespCommand.Set, -1, null, (byte)SetOperation.SUNION) },
{(byte)SetOperation.SUNIONSTORE, new RespCommandsInfo("SUNIONSTORE", RespCommand.Set, -2, null, (byte)SetOperation.SUNIONSTORE) },
{(byte)SetOperation.SDIFF, new RespCommandsInfo("SDIFF", RespCommand.Set, -1, null, (byte)SetOperation.SDIFF) },
{(byte)SetOperation.SDIFFSTORE, new RespCommandsInfo("SDIFFSTORE", RespCommand.Set, -2, null, (byte)SetOperation.SDIFFSTORE) }
};

private static readonly Dictionary<RespCommand, RespCommandsInfo> customCommandsInfoMap = new Dictionary<RespCommand, RespCommandsInfo>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/RespInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static HashSet<string> GetCommands()
// Pub/sub
"PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE",
// Set
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER", "SUNION", "SDIFF", "SDIFFSTORE", "SMOVE",
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER", "SUNION", "SUNIONSTORE", "SDIFF", "SDIFFSTORE", "SMOVE",
//Scan ops
"DBSIZE", "KEYS","SCAN",
// Geospatial commands
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, byte subcmd, int
(RespCommand.Set, (byte)SetOperation.SSCAN) => ObjectScan(count, ptr, GarnetObjectType.Set, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SMOVE) => SetMove(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SUNION) => SetUnion(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SUNIONSTORE) => SetUnionStore(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFF) => SetDiff(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SDIFFSTORE) => SetDiffStore(count, ptr, ref storageApi),
_ => ProcessOtherCommands(cmd, subcmd, count, ref storageApi),
Expand Down
82 changes: 72 additions & 10 deletions libs/server/Storage/Session/ObjectStore/SetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,8 @@ internal unsafe GarnetStatus SetMove(ArgSlice sourceKey, ArgSlice destinationKey
/// </summary>
/// <param name="keys"></param>
/// <param name="output"></param>
/// <param name="objectStoreContext"></param>
/// <typeparam name="TObjectContext"></typeparam>
/// <returns></returns>
public GarnetStatus SetUnion<TObjectContext>(ArgSlice[] keys, out HashSet<byte[]> output, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
{
output = new HashSet<byte[]>(new ByteArrayComparer());

Expand All @@ -455,14 +452,58 @@ public GarnetStatus SetUnion<TObjectContext>(ArgSlice[] keys, out HashSet<byte[]

try
{
foreach (var key in keys)
output = SetUnion(keys, ref setObjectStoreLockableContext);
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}

return GarnetStatus.OK;
}

/// <summary>
/// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination.
/// If destination already exists, it is overwritten.
/// </summary>
/// <param name="key"></param>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <returns></returns>
public GarnetStatus SetUnionStore(byte[] key, ArgSlice[] keys, out int count)
{
count = default;

var destination = scratchBufferManager.CreateArgSlice(key);

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
txnManager.SaveKeyEntryToLock(destination, true, LockType.Exclusive);
foreach (var item in keys)
txnManager.SaveKeyEntryToLock(item, true, LockType.Shared);
_ = txnManager.Run(true);
}

// SetObject
var setObjectStoreLockableContext = txnManager.ObjectStoreLockableContext;

try
{
var members = SetUnion(keys, ref setObjectStoreLockableContext);

var newSetObject = new SetObject();
foreach (var item in members)
{
if (GET(key.ToArray(), out var currObject, ref setObjectStoreLockableContext) == GarnetStatus.OK)
{
var currSet = ((SetObject)currObject.garnetObject).Set;
output.UnionWith(currSet);
}
_ = newSetObject.Set.Add(item);
newSetObject.UpdateSize(item);
}
_ = SET(key, newSetObject, ref setObjectStoreLockableContext);
count = members.Count;
}
finally
{
Expand All @@ -473,6 +514,27 @@ public GarnetStatus SetUnion<TObjectContext>(ArgSlice[] keys, out HashSet<byte[]
return GarnetStatus.OK;
}

private HashSet<byte[]> SetUnion<TObjectContext>(ArgSlice[] keys, ref TObjectContext objectContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
var result = new HashSet<byte[]>(new ByteArrayComparer());
if (keys.Length == 0)
{
return result;
}

foreach (var item in keys)
{
if (GET(item.ToArray(), out var currObject, ref objectContext) == GarnetStatus.OK)
{
var currSet = ((SetObject)currObject.garnetObject).Set;
result.UnionWith(currSet);
}
}

return result;
}

/// <summary>
/// Adds the specified members to the set at key.
/// Specified members that are already a member of this set are ignored.
Expand Down
1 change: 1 addition & 0 deletions libs/server/Transaction/TxnKeyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private int SetObjectKeys(byte subCommand, int inputCount)
(byte)SetOperation.SPOP => SingleKey(1, true, LockType.Exclusive),
(byte)SetOperation.SISMEMBER => SingleKey(1, true, LockType.Shared),
(byte)SetOperation.SUNION => ListKeys(inputCount, true, LockType.Shared),
(byte)SetOperation.SUNIONSTORE => XSTOREKeys(inputCount, true),
(byte)SetOperation.SDIFF => ListKeys(inputCount, true, LockType.Shared),
(byte)SetOperation.SDIFFSTORE => XSTOREKeys(inputCount, true),
(byte)SetOperation.SMOVE => ListKeys(inputCount, true, LockType.Exclusive),
Expand Down
59 changes: 59 additions & 0 deletions test/Garnet.test/RespSetTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,35 @@ public void CanDoSetUnion()
}
}

[Test]
[TestCase("key")]
[TestCase("")]
public void CanDoSetUnionStore(string key)
{
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);

var key1 = "key1";
var key1Value = new RedisValue[] { "a", "b", "c" };

var key2 = "key2";
var key2Value = new RedisValue[] { "c", "d", "e" };

var addResult = db.SetAdd(key1, key1Value);
Assert.AreEqual(3, addResult);
addResult = db.SetAdd(key2, key2Value);
Assert.AreEqual(3, addResult);

var result = (int)db.Execute("SUNIONSTORE", key, key1, key2);
Assert.AreEqual(5, result);

var membersResult = db.SetMembers(key);
Assert.AreEqual(5, membersResult.Length);
var strResult = membersResult.Select(m => m.ToString()).ToArray();
var expectedResult = new[] { "a", "b", "c", "d", "e" };
Assert.IsTrue(expectedResult.OrderBy(t => t).SequenceEqual(strResult.OrderBy(t => t)));
}

[Test]
public void CanDoSdiff()
{
Expand Down Expand Up @@ -854,6 +883,21 @@ public void CanDoSetUnionLC()
Assert.AreEqual(expectedResponse, strResponse);
}

[Test]
public void CanDoSunionStoreLC()
{
var lightClientRequest = TestUtils.CreateRequest();
_ = lightClientRequest.SendCommand("SADD key1 a b c");
_ = lightClientRequest.SendCommand("SADD key2 c d e");
var response = lightClientRequest.SendCommand("SUNIONSTORE key key1 key2");
var expectedResponse = ":5\r\n";
Assert.AreEqual(expectedResponse, response.AsSpan().Slice(0, expectedResponse.Length).ToArray());

var membersResponse = lightClientRequest.SendCommand("SMEMBERS key");
expectedResponse = "*5\r\n$1\r\na\r\n$1\r\nb\r\n$1\r\nc\r\n$1\r\nd\r\n$1\r\ne\r\n";
Assert.AreEqual(expectedResponse, membersResponse.AsSpan().Slice(0, expectedResponse.Length).ToArray());
}

[Test]
public void CanDoSdiffLC()
{
Expand Down Expand Up @@ -941,6 +985,21 @@ public void CanDoSdiffStoreWhenMemberKeysNotExisting()
strResponse = Encoding.ASCII.GetString(membersResponse).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);
}

[Test]
public void CanDoSunionStoreWhenMemberKeysNotExisting()
{
using var lightClientRequest = TestUtils.CreateRequest();
var response = lightClientRequest.SendCommand("SUNIONSTORE key key1 key2 key3");
var expectedResponse = ":0\r\n";
var strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

var membersResponse = lightClientRequest.SendCommand("SMEMBERS key");
expectedResponse = "*0\r\n";
strResponse = Encoding.ASCII.GetString(membersResponse).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);
}
#endregion


Expand Down
2 changes: 1 addition & 1 deletion website/docs/commands/api-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ Note that this list is subject to change as we continue to expand our API comman
| | [SREM](data-structures.md#srem) || |
| | [SSCAN](data-structures.md#sscan) || |
| | [SUNION](data-structures.md#sunion) || |
| | SUNIONSTORE | | |
| | [SUNIONSTORE](data-structures.md#sunionstore) | | |
| <span id="sorted-set">**SORTED SET**</span> | BZPOP || |
| | BZPOPMAX || |
| | BZPOPMIN || |
Expand Down
14 changes: 14 additions & 0 deletions website/docs/commands/data-structures.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,20 @@ Keys that do not exist are considered to be empty sets.
---
### SUNIONSTORE
#### Syntax
```bash
SUNIONSTORE destination key [key ...]
```
This command is equal to [SUNION](#SUNION), but instead of returning the resulting set, it is stored in **destination**.
If **destination** already exists, it is overwritten.
---
### SDIFF
#### Syntax
Expand Down

0 comments on commit a4dc14f

Please sign in to comment.