Skip to content

Commit

Permalink
Add support SUNION command (#189)
Browse files Browse the repository at this point in the history
* commit

* Add support for SUNION Command

* fix commit

* comment debug message

* format the code

* delete unused code

* fix unit test

* fix parsing params

* support sunion command

* Added some fixes

* dotnet format

* Added SUNION to website docs

---------

Co-authored-by: Tal Zaccai <[email protected]>
  • Loading branch information
argsno and TalZaccai authored Apr 17, 2024
1 parent 17572f4 commit 20c2960
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 5 deletions.
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ public GarnetStatus SetRandomMember(byte[] key, ArgSlice input, ref GarnetObject
public GarnetStatus SetScan(ArgSlice key, long cursor, string match, int count, out ArgSlice[] items)
=> storageSession.SetScan(key, cursor, match, count, out items, ref objectContext);

/// <inheritdoc />
public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
=> storageSession.SetUnion(keys, out output, ref objectContext);

#endregion

#region Hash Methods
Expand Down
11 changes: 11 additions & 0 deletions libs/server/API/GarnetWatchApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ public GarnetStatus SetScan(ArgSlice key, long cursor, string match, int count,
return garnetApi.SetScan(key, cursor, match, count, out items);
}

/// <inheritdoc />
public GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output)
{
foreach (var key in keys)
{
garnetApi.WATCH(key, StoreType.Object);
}
return garnetApi.SetUnion(keys, out output);
}


#endregion

#region Hash Methods
Expand Down
9 changes: 9 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,15 @@ public interface IGarnetReadApi
/// <returns></returns>
GarnetStatus SetScan(ArgSlice key, long cursor, string match, int count, out ArgSlice[] items);

/// <summary>
/// Returns the members of the set resulting from the union of all the given sets.
/// Keys that do not exist are considered to be empty sets.
/// </summary>
/// <param name="keys"></param>
/// <param name="output"></param>
/// <returns></returns>
GarnetStatus SetUnion(ArgSlice[] keys, out HashSet<byte[]> output);

#endregion

#region Hash Methods
Expand Down
3 changes: 3 additions & 0 deletions libs/server/Objects/Set/SetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum SetOperation : byte
SSCAN,
SRANDMEMBER,
SISMEMBER,
SUNION,
}


Expand Down Expand Up @@ -196,5 +197,7 @@ public override unsafe void Scan(long start, out List<byte[]> items, out long cu
if (cursor == set.Count)
cursor = 0;
}

public HashSet<byte[]> Set => set;
}
}
52 changes: 52 additions & 0 deletions libs/server/Resp/Objects/SetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,58 @@ private unsafe bool SetAdd<TGarnetApi>(int count, byte* ptr, ref TGarnetApi stor
return true;
}

/// <summary>
/// Returns the members of the set resulting from the union of all the given sets.
/// Keys that do not exist are considered to be empty sets.
/// </summary>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <typeparam name="TGarnetApi"></typeparam>
/// <returns></returns>
private bool SetUnion<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 1)
{
return AbortWithWrongNumberOfArguments("SUNION", count);
}

// Read all the keys
ArgSlice[] keys = new ArgSlice[count];

for (int i = 0; i < keys.Length; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkKeyArraySlotVerify(ref keys, true))
{
var bufSpan = new ReadOnlySpan<byte>(recvBufferPtr, bytesRead);
if (!DrainCommands(bufSpan, count)) return false;
return true;
}

storageApi.SetUnion(keys, out var result);

// write the size of result
var resultCount = result.Count;
while (!RespWriteUtils.WriteArrayLength(resultCount, ref dcurr, dend))
SendAndReset();

foreach (var item in result)
{
while (!RespWriteUtils.WriteBulkString(item, ref dcurr, dend))
SendAndReset();
}

// update read pointers
readHead = (int)(ptr - recvBufferPtr);
return true;
}

/// <summary>
/// Remove the specified members from the set.
/// Specified members that are not a member of this set are ignored.
Expand Down
4 changes: 4 additions & 0 deletions libs/server/Resp/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,10 @@ static RespCommand MatchedNone(RespServerSession session, int oldReadHead)
{
return (RespCommand.STRLEN, 0);
}
else if (*(ulong*)(ptr + 4) == MemoryMarshal.Read<ulong>("SUNION\r\n"u8))
{
return (RespCommand.Set, (byte)SetOperation.SUNION);
}
break;

case 'U':
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespCommandsInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public static RespCommandsInfo findCommand(RespCommand cmd, byte subCmd = 0)
{(byte)SetOperation.SPOP, new RespCommandsInfo("SPOP", RespCommand.Set, -1, null, (byte)SetOperation.SPOP) },
{(byte)SetOperation.SSCAN, new RespCommandsInfo("SSCAN", RespCommand.Set, -2, null, (byte)SetOperation.SSCAN) },
{(byte)SetOperation.SISMEMBER, new RespCommandsInfo("SISMEMBER",RespCommand.Set, 2, null, (byte)SetOperation.SISMEMBER) },
{(byte)SetOperation.SUNION, new RespCommandsInfo("SUNION", RespCommand.Set, -1, null, (byte)SetOperation.SUNION) },
};

private static readonly Dictionary<RespCommand, RespCommandsInfo> customCommandsInfoMap = new Dictionary<RespCommand, RespCommandsInfo>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/RespInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static HashSet<string> GetCommands()
// Pub/sub
"PUBLISH", "SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE",
// Set
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER",
"SADD", "SREM", "SPOP", "SMEMBERS", "SCARD", "SSCAN", "SRANDMEMBER", "SISMEMBER", "SUNION",
//Scan ops
"DBSIZE", "KEYS","SCAN",
// Geospatial commands
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, byte subcmd, int
(RespCommand.Set, (byte)SetOperation.SPOP) => SetPop(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SRANDMEMBER) => SetRandomMember(count, ptr, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SSCAN) => ObjectScan(count, ptr, GarnetObjectType.Set, ref storageApi),
(RespCommand.Set, (byte)SetOperation.SUNION) => SetUnion(count, ptr, ref storageApi),
_ => ProcessOtherCommands(cmd, subcmd, count, ref storageApi),
};
return success;
Expand Down
54 changes: 54 additions & 0 deletions libs/server/Storage/Session/ObjectStore/SetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using Garnet.common;
using Tsavorite.core;
Expand Down Expand Up @@ -361,6 +363,58 @@ public unsafe GarnetStatus SetScan<TObjectContext>(ArgSlice key, long cursor, st

}

/// <summary>
/// Returns the members of the set resulting from the union of all the given sets.
/// Keys that do not exist are considered to be empty sets.
/// </summary>
/// <param name="keys"></param>
/// <param name="output"></param>
/// <param name="objectStoreContext"></param>
/// <typeparam name="TObjectContext"></typeparam>
/// <returns></returns>
public GarnetStatus SetUnion<TObjectContext>(ArgSlice[] keys, out HashSet<byte[]> output, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long>
{
output = new HashSet<byte[]>(new ByteArrayComparer());

if (keys.Length == 0)
return GarnetStatus.OK;

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
foreach (var item in keys)
txnManager.SaveKeyEntryToLock(item, true, LockType.Shared);
_ = txnManager.Run(true);
}

// SetObject
var setObjectStoreLockableContext = txnManager.ObjectStoreLockableContext;

try
{
foreach (var key in keys)
{
if (GET(key.ToArray(), out var currObject, ref setObjectStoreLockableContext) == GarnetStatus.OK)
{
var currSet = ((SetObject)currObject.garnetObject).Set;
output.UnionWith(currSet);
}
}
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}

return GarnetStatus.OK;
}


/// <summary>
/// Adds the specified members to the set at key.
/// Specified members that are already a member of this set are ignored.
Expand Down
5 changes: 3 additions & 2 deletions libs/server/Transaction/TxnKeyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal int GetKeys(RespCommand command, int inputCount, out ReadOnlySpan<byte>
RespCommand.SortedSet => SortedSetObjectKeys(subCommand, inputCount),
RespCommand.List => ListObjectKeys(subCommand),
RespCommand.Hash => HashObjectKeys(subCommand),
RespCommand.Set => SetObjectKeys(subCommand),
RespCommand.Set => SetObjectKeys(subCommand, inputCount),
RespCommand.GET => SingleKey(1, false, LockType.Shared),
RespCommand.SET => SingleKey(1, false, LockType.Exclusive),
RespCommand.GETRANGE => SingleKey(1, false, LockType.Shared),
Expand Down Expand Up @@ -172,7 +172,7 @@ private int HashObjectKeys(byte subCommand)
};
}

private int SetObjectKeys(byte subCommand)
private int SetObjectKeys(byte subCommand, int inputCount)
{
return subCommand switch
{
Expand All @@ -183,6 +183,7 @@ private int SetObjectKeys(byte subCommand)
(byte)SetOperation.SRANDMEMBER => SingleKey(1, true, LockType.Exclusive),
(byte)SetOperation.SPOP => SingleKey(1, true, LockType.Exclusive),
(byte)SetOperation.SISMEMBER => SingleKey(1, true, LockType.Shared),
(byte)SetOperation.SUNION => ListKeys(inputCount, true, LockType.Shared),
_ => -1
};
}
Expand Down
89 changes: 88 additions & 1 deletion test/Garnet.test/RespSetTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Garnet.server;
using NUnit.Framework;
using StackExchange.Redis;
using SetOperation = StackExchange.Redis.SetOperation;

namespace Garnet.test
{
Expand Down Expand Up @@ -273,6 +274,52 @@ public void CanDoSScanWithCursor()
Assert.AreEqual(l, $"value:{setEntries.Length - 1}");
}

[Test]
public void CanDoSetUnion()
{
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
var redisValues1 = new RedisValue[] { "item-a", "item-b", "item-c", "item-d" };
var result = db.SetAdd(new RedisKey("key1"), redisValues1);
Assert.AreEqual(4, result);

result = db.SetAdd(new RedisKey("key2"), new RedisValue[] { "item-c" });
Assert.AreEqual(1, result);

result = db.SetAdd(new RedisKey("key3"), new RedisValue[] { "item-a", "item-c", "item-e" });
Assert.AreEqual(3, result);

var members = db.SetCombine(SetOperation.Union, new RedisKey[] { "key1", "key2", "key3" });
RedisValue[] entries = new RedisValue[] { "item-a", "item-b", "item-c", "item-d", "item-e" };
Assert.AreEqual(5, members.Length);
// assert two arrays are equal ignoring order
Assert.IsTrue(members.OrderBy(x => x).SequenceEqual(entries.OrderBy(x => x)));

members = db.SetCombine(SetOperation.Union, new RedisKey[] { "key1", "key2", "key3", "_not_exists" });
Assert.AreEqual(5, members.Length);
Assert.IsTrue(members.OrderBy(x => x).SequenceEqual(entries.OrderBy(x => x)));

members = db.SetCombine(SetOperation.Union, new RedisKey[] { "_not_exists_1", "_not_exists_2", "_not_exists_3" });
Assert.IsEmpty(members);

members = db.SetCombine(SetOperation.Union, new RedisKey[] { "_not_exists_1", "key1", "_not_exists_2", "_not_exists_3" });
Assert.AreEqual(4, members.Length);
Assert.IsTrue(members.OrderBy(x => x).SequenceEqual(redisValues1.OrderBy(x => x)));

members = db.SetCombine(SetOperation.Union, new RedisKey[] { "key1", "key2" });
Assert.AreEqual(4, members.Length);
Assert.IsTrue(members.OrderBy(x => x).SequenceEqual(redisValues1.OrderBy(x => x)));

try
{
db.SetCombine(SetOperation.Union, new RedisKey[] { });
Assert.Fail();
}
catch (RedisServerException e)
{
Assert.AreEqual(string.Format(CmdStrings.GenericErrWrongNumArgs, "SUNION"), e.Message);
}
}


#endregion
Expand Down Expand Up @@ -303,7 +350,6 @@ public void CanAddAndListMembersLC()
expectedResponse = "*2\r\n$7\r\n\"Hello\"\r\n$7\r\n\"World\"\r\n";
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

}

[Test]
Expand Down Expand Up @@ -570,6 +616,47 @@ public void MultiWithNonExistingSet()
Assert.AreEqual(res.AsSpan().Slice(0, expectedResponse.Length).ToArray(), expectedResponse);
}

[Test]
public void CanDoSetUnionLC()
{
using var lightClientRequest = TestUtils.CreateRequest();
var response = lightClientRequest.SendCommand("SADD myset ItemOne ItemTwo ItemThree ItemFour");
var expectedResponse = ":4\r\n";
var strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

response = lightClientRequest.SendCommand("SUNION myset another_set", 5);
expectedResponse = "*4\r\n";
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

lightClientRequest.SendCommand("SADD another_set ItemOne ItemFive ItemTwo ItemSix ItemSeven");
response = lightClientRequest.SendCommand("SUNION myset another_set", 8);
expectedResponse = "*7\r\n";
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

response = lightClientRequest.SendCommand("SUNION myset no_exist_set", 5);
expectedResponse = "*4\r\n";
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

response = lightClientRequest.SendCommand("SUNION no_exist_set myset no_exist_set another_set", 8);
expectedResponse = "*7\r\n";
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

response = lightClientRequest.SendCommand("SUNION myset", 5);
expectedResponse = "*4\r\n";
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);

response = lightClientRequest.SendCommand("SUNION");
expectedResponse = $"-{string.Format(CmdStrings.GenericErrWrongNumArgs, "SUNION")}\r\n";
strResponse = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length);
Assert.AreEqual(expectedResponse, strResponse);
}

#endregion


Expand Down
2 changes: 1 addition & 1 deletion website/docs/commands/api-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ Note that this list is subject to change as we continue to expand our API comman
| | [SRANDMEMBER](data-structures.md#srandmember) || |
| | [SREM](data-structures.md#srem) || |
| | [SSCAN](data-structures.md#sscan) || |
| | SUNION | | |
| | [SUNION](data-structures.md#sunion) | | |
| | SUNIONSTORE || |
| <span id="sorted-set">**SORTED SET**</span> | BZPOP || |
| | BZPOPMAX || |
Expand Down
13 changes: 13 additions & 0 deletions website/docs/commands/data-structures.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,19 @@ The **match** parameter allows to apply a filter to elements after they have bee
---
### SUNION
#### Syntax
```bash
SUNION key [key ...]
```
Returns the members of the set resulting from the union of all the given sets.
Keys that do not exist are considered to be empty sets.
---
## Sorted Set
### ZADD
Expand Down

0 comments on commit 20c2960

Please sign in to comment.