Skip to content

Commit

Permalink
add basic support & tests for lmpop (#521)
Browse files Browse the repository at this point in the history
* add basic support & tests for lmpop

* renam local method (typo fix)

* add LMPopACLsAsync test

* fix notfound status returned as wrongtype

* fix try to index on null resultArray without check

* refactor list pop multiple methods (review suggestions)

* add lmpop to TransactionManager

* add more LMPOP list tests

* use num utils int parser and convert method to list keys override

* compare count case insenstive

* remove unused param from method docs

---------

Co-authored-by: Tal Zaccai <[email protected]>
  • Loading branch information
funcmike and TalZaccai authored Jul 18, 2024
1 parent 5beba2d commit 5266882
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 4 deletions.
2 changes: 1 addition & 1 deletion libs/client/GarnetClientProcessReplies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ unsafe bool ProcessReplyAsString(ref byte* ptr, byte* end, out string result, ou
if (!RespReadResponseUtils.ReadStringArrayWithLengthHeader(out var resultArray, ref ptr, end))
return false;
// Return first element of array
result = resultArray[0];
if (resultArray != null) result = resultArray[0];
break;

default:
Expand Down
8 changes: 8 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ public unsafe GarnetStatus ListLeftPop(ArgSlice key, out ArgSlice element)
public GarnetStatus ListLeftPop(ArgSlice key, int count, out ArgSlice[] poppedElements)
=> storageSession.ListPop(key, count, ListOperation.LPOP, ref objectContext, out poppedElements);

/// <inheritdoc />
public GarnetStatus ListLeftPop(ArgSlice[] keys, int count, out ArgSlice poppedKey, out ArgSlice[] poppedElements)
=> storageSession.ListPopMultiple(keys, OperationDirection.Left, count, ref objectContext, out poppedKey, out poppedElements);

/// <inheritdoc />
public GarnetStatus ListRightPop(byte[] key, ArgSlice input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ListPop(key, input, ref outputFooter, ref objectContext);
Expand All @@ -202,6 +206,10 @@ public unsafe GarnetStatus ListRightPop(ArgSlice key, out ArgSlice element)
public GarnetStatus ListRightPop(ArgSlice key, int count, out ArgSlice[] poppedElements)
=> storageSession.ListPop(key, count, ListOperation.RPOP, ref objectContext, out poppedElements);

/// <inheritdoc />
public GarnetStatus ListRightPop(ArgSlice[] keys, int count, out ArgSlice poppedKey, out ArgSlice[] poppedElements)
=> storageSession.ListPopMultiple(keys, OperationDirection.Right, count, ref objectContext, out poppedKey, out poppedElements);

#endregion

/// <inheritdoc />
Expand Down
21 changes: 21 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus ListLeftPop(ArgSlice key, int count, out ArgSlice[] elements);

/// <summary>
/// ListLeftPop ArgSlice version for multiple keys and values
/// </summary>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <param name="key"></param>
/// <param name="elements"></param>
/// <returns>GarnetStatus</returns>
GarnetStatus ListLeftPop(ArgSlice[] keys, int count, out ArgSlice key, out ArgSlice[] elements);

/// <summary>
/// ListRightPop ArgSlice version, with GarnetObjectStoreOutput
/// </summary>
Expand All @@ -713,6 +723,17 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus ListRightPop(ArgSlice key, int count, out ArgSlice[] elements);


/// <summary>
/// ListRightPop ArgSlice version for multiple keys and values
/// </summary>
/// <param name="keys"></param>
/// <param name="count"></param>
/// <param name="key"></param>
/// <param name="elements"></param>
/// <returns>GarnetStatus</returns>
GarnetStatus ListRightPop(ArgSlice[] keys, int count, out ArgSlice key, out ArgSlice[] elements);

#endregion

/// <summary>
Expand Down
113 changes: 113 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,119 @@ private unsafe bool ListPop<TGarnetApi>(RespCommand command, int count, byte* pt
return true;
}


/// <summary>
/// LMPOP numkeys key [key ...] LEFT | RIGHT [COUNT count]
/// </summary>
/// <param name="count"></param>
/// <param name="ptr"></param>
/// <param name="storageApi"></param>
/// <returns></returns>
private unsafe bool ListPopMultiple<TGarnetApi>(int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (count < 3)
{
return AbortWithWrongNumberOfArguments("LMPOP", count);
}

// Read count of keys
if (!RespReadUtils.ReadIntWithLengthHeader(out var numKeys, ref ptr, recvBufferPtr + bytesRead))
return false;

if (count != numKeys + 2 && count != numKeys + 4)
{
return AbortWithErrorMessage(count, CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR);
}

// Get the keys for Lists
var keys = new ArgSlice[numKeys];

for (var i = 0; i < keys.Length; i++)
{
keys[i] = default;
if (!RespReadUtils.ReadPtrWithLengthHeader(ref keys[i].ptr, ref keys[i].length, ref ptr, recvBufferPtr + bytesRead))
return false;
}

if (NetworkMultiKeySlotVerify(readOnly: false, firstKey: 1, lastKey: numKeys + 1))
return true;

ArgSlice dir = default;

if (!RespReadUtils.ReadPtrWithLengthHeader(ref dir.ptr, ref dir.length, ref ptr, recvBufferPtr + bytesRead))
return false;

var popDirection = GetOperationDirection(dir);

if (popDirection == OperationDirection.Unknown)
{
return AbortWithErrorMessage(count, CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR);
}

int popCount = 1;

if (count == numKeys + 4)
{
ArgSlice countArg = default;

if (!RespReadUtils.ReadPtrWithLengthHeader(ref countArg.ptr, ref countArg.length, ref ptr, recvBufferPtr + bytesRead))
return false;

if (!countArg.ReadOnlySpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.COUNT))
{
return AbortWithErrorMessage(count, CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR);
}

// Read count
if (!RespReadUtils.ReadIntWithLengthHeader(out popCount, ref ptr, recvBufferPtr + bytesRead)) return false;
}

GarnetStatus statusOp; ArgSlice key; ArgSlice[] elements;

if (popDirection == OperationDirection.Left)
{
statusOp = storageApi.ListLeftPop(keys, popCount, out key, out elements);
}
else
{
statusOp = storageApi.ListRightPop(keys, popCount, out key, out elements);
}

switch (statusOp)
{
case GarnetStatus.OK:
while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend))
SendAndReset();

while (!RespWriteUtils.WriteBulkString(key.Span, ref dcurr, dend))
SendAndReset();

while (!RespWriteUtils.WriteArrayLength(elements.Length, ref dcurr, dend))
SendAndReset();

foreach (var element in elements)
{
while (!RespWriteUtils.WriteBulkString(element.Span, ref dcurr, dend))
SendAndReset();
}

break;
case GarnetStatus.NOTFOUND:
while (!RespWriteUtils.WriteNullArray(ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

readHead = (int)(ptr - recvBufferPtr);

return true;
}

private bool ListBlockingPop<TGarnetApi>(RespCommand command, int count, byte* ptr, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public enum RespCommand : byte
INCRBY,
LINSERT,
LMOVE,
LMPOP,
LPOP,
LPUSH,
LPUSHX,
Expand Down Expand Up @@ -790,6 +791,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.LMOVE;
}
else if (*(ulong*)(ptr + 3) == MemoryMarshal.Read<ulong>("\nLMPOP\r\n"u8))
{
return RespCommand.LMPOP;
}
break;

case 'P':
Expand Down
43 changes: 43 additions & 0 deletions libs/server/Resp/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -2558,6 +2558,49 @@
],
"SubCommands": null
},
{
"Command": "LMPOP",
"Name": "LMPOP",
"IsInternal": false,
"Arity": 5,
"Flags": "DenyOom, Write",
"FirstKey": 1,
"LastKey": 2,
"Step": 1,
"AclCategories": "List, Slow, Write",
"Tips": null,
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RW, Access, Delete"
},
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RW, Insert"
}
],
"SubCommands": null
},
{
"Command": "LPOP",
"Name": "LPOP",
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.LREM => ListRemove(count, ptr, ref storageApi),
RespCommand.RPOPLPUSH => ListRightPopLeftPush(count, ptr, ref storageApi),
RespCommand.LMOVE => ListMove(count, ptr, ref storageApi),
RespCommand.LMPOP => ListPopMultiple(count, ptr, ref storageApi),
RespCommand.LSET => ListSet(count, ptr, ref storageApi),
RespCommand.BLPOP => ListBlockingPop(cmd, count, ptr, ref storageApi),
RespCommand.BRPOP => ListBlockingPop(cmd, count, ptr, ref storageApi),
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Storage/Session/ObjectStore/Common.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ GarnetStatus RMWObjectStoreOperationWithOutput<TObjectContext>(byte[] key, ArgSl
if (status.IsPending)
CompletePendingForObjectStoreSession(ref status, ref outputFooter, ref objectStoreContext);

if (outputFooter.spanByteAndMemory.Length == 0)
if (!status.NotFound && outputFooter.spanByteAndMemory.Length == 0)
return GarnetStatus.WRONGTYPE;

return status.Found || status.Record.Created ? GarnetStatus.OK : GarnetStatus.NOTFOUND;
Expand Down
39 changes: 39 additions & 0 deletions libs/server/Storage/Session/ObjectStore/ListOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,45 @@ public unsafe GarnetStatus ListPop<TObjectContext>(ArgSlice key, int count, List
return status;
}

/// <summary>
/// Removes the count elements from the head(left) or tail(right) of the first non-empty list key from the list of provided key names.
/// If the list contains less than count elements, removes and returns the number of elements in the list.
/// </summary>
/// <typeparam name="TObjectContext"></typeparam>
/// <param name="keys"></param>
/// <param name="direction"></param>
/// <param name="count"></param>
/// <param name="objectContext"></param>
/// <param name="key"></param>
/// <param name="elements"></param>
/// <returns>The count elements popped from the list</returns>
public unsafe GarnetStatus ListPopMultiple<TObjectContext>(ArgSlice[] keys, OperationDirection direction, int count, ref TObjectContext objectContext, out ArgSlice key, out ArgSlice[] elements)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
foreach (var k in keys)
{
GarnetStatus statusOp;

if (direction == OperationDirection.Left)
{
statusOp = ListPop(k, count, ListOperation.LPOP, ref objectContext, out elements);
}
else
{
statusOp = ListPop(k, count, ListOperation.RPOP, ref objectContext, out elements);
}

if (statusOp == GarnetStatus.NOTFOUND) continue;

key = k;
return statusOp;
}

key = default;
elements = default;
return GarnetStatus.NOTFOUND;
}

/// <summary>
/// Gets the current count of elements in the List at Key
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions libs/server/Transaction/TxnKeyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using Garnet.common;
using Tsavorite.core;

namespace Garnet.server
Expand Down Expand Up @@ -90,6 +91,7 @@ internal int GetKeys(RespCommand command, int inputCount, out ReadOnlySpan<byte>
RespCommand.LINSERT => ListObjectKeys((byte)ListOperation.LINSERT),
RespCommand.LLEN => ListObjectKeys((byte)ListOperation.LLEN),
RespCommand.LMOVE => ListObjectKeys((byte)ListOperation.LMOVE),
RespCommand.LMPOP => ListKeys(true, LockType.Exclusive),
RespCommand.LPOP => ListObjectKeys((byte)ListOperation.LPOP),
RespCommand.LPUSH => ListObjectKeys((byte)ListOperation.LPUSH),
RespCommand.LPUSHX => ListObjectKeys((byte)ListOperation.LPUSHX),
Expand Down Expand Up @@ -298,6 +300,26 @@ private int ListKeys(int inputCount, bool isObject, LockType type)
return inputCount;
}

/// <summary>
/// Returns a list of keys for LMPOP command
/// </summary>
private int ListKeys(bool isObject, LockType type)
{
var numKeysArg = respSession.GetCommandAsArgSlice(out bool success);
if (!success) return -2;

if (!NumUtils.TryParse(numKeysArg.ReadOnlySpan, out int numKeys)) return -2;

for (int i = 0; i < numKeys; i++)
{
var key = respSession.GetCommandAsArgSlice(out success);
if (!success) return -2;
SaveKeyEntryToLock(key, isObject, type);
SaveKeyArgSlice(key);
}
return numKeys;
}

/// <summary>
/// Returns a list of keys for MSET commands
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class SupportedCommand
new("LINSERT", RespCommand.LINSERT),
new("LLEN", RespCommand.LLEN),
new("LMOVE", RespCommand.LMOVE),
new("LMPOP", RespCommand.LMPOP),
new("LPOP", RespCommand.LPOP),
new("LPUSH", RespCommand.LPUSH),
new("LPUSHX", RespCommand.LPUSHX),
Expand Down
Loading

0 comments on commit 5266882

Please sign in to comment.