Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check key type first in blocking functions #1130

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 101 additions & 31 deletions libs/server/Objects/ItemBroker/CollectionItemBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private void InitializeObserver(CollectionItemObserver observer, byte[][] keys)
// If the key already has a non-empty observer queue, it does not have an item to retrieve
// Otherwise, try to retrieve next available item
if ((KeysToObservers.ContainsKey(key) && KeysToObservers[key].Count > 0) ||
!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, true,
out _, out var result)) continue;

// An item was found - set the observer result and return
Expand Down Expand Up @@ -291,7 +291,7 @@ private bool TryAssignItemFromKey(byte[] key)
}

// Try to get next available item from object stored in key
if (!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
if (!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, false,
out var currCount, out var result))
{
// If unsuccessful getting next item but there is at least one item in the collection,
Expand Down Expand Up @@ -436,7 +436,13 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort
}
}

private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result)
/// <summary>
/// Try to get available item(s) from sorted set object based on command type and arguments
/// When run from initializer (initial = true), can return WRONGTYPE errors
/// </summary>
private unsafe bool TryGetResult(byte[] key, StorageSession storageSession,
RespCommand command, ArgSlice[] cmdArgs, bool initial,
out int currCount, out CollectionItemResult result)
{
currCount = default;
result = default;
Expand All @@ -449,6 +455,7 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
_ => throw new NotSupportedException()
};

var asKey = storageSession.scratchBufferManager.CreateArgSlice(key);
ArgSlice dstKey = default;
if (command == RespCommand.BLMOVE)
{
Expand All @@ -460,32 +467,42 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
{
Debug.Assert(storageSession.txnManager.state == TxnState.None);
createTransaction = true;
var asKey = storageSession.scratchBufferManager.CreateArgSlice(key);
if (initial)
storageSession.txnManager.SaveKeyEntryToLock(asKey, false, LockType.Exclusive);
storageSession.txnManager.SaveKeyEntryToLock(asKey, true, LockType.Exclusive);

if (command == RespCommand.BLMOVE)
{
if (initial)
storageSession.txnManager.SaveKeyEntryToLock(dstKey, false, LockType.Exclusive);
storageSession.txnManager.SaveKeyEntryToLock(dstKey, true, LockType.Exclusive);
}

_ = storageSession.txnManager.Run(true);
}

var lockableContext = storageSession.txnManager.LockableContext;
var objectLockableContext = storageSession.txnManager.ObjectStoreLockableContext;

try
{
// Get the object stored at key
var statusOp = storageSession.GET(key, out var osObject, ref objectLockableContext);
if (statusOp == GarnetStatus.NOTFOUND) return false;

IGarnetObject dstObj = null;
byte[] arrDstKey = default;
if (command == RespCommand.BLMOVE)
if (statusOp == GarnetStatus.NOTFOUND)
{
arrDstKey = dstKey.ToArray();
var dstStatusOp = storageSession.GET(arrDstKey, out var osDstObject, ref objectLockableContext);
if (dstStatusOp != GarnetStatus.NOTFOUND) dstObj = osDstObject.GarnetObject;
if (!initial)
return false;

// Check the string store as well to see if WRONGTYPE should be returned.
statusOp = storageSession.GET(asKey, out ArgSlice _, ref lockableContext);

if (statusOp != GarnetStatus.NOTFOUND)
{
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
return initial;
}

return false;
}

// Check for type match between the observer and the actual object type
Expand All @@ -494,29 +511,64 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
{
case ListObject listObj:
currCount = listObj.LnkList.Count;
if (objectType != GarnetObjectType.List) return false;
if (currCount == 0) return false;
if (objectType != GarnetObjectType.List)
{
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
return initial;
}
if (currCount == 0)
return false;

var isSuccessful = false;
switch (command)
{
case RespCommand.BLPOP:
case RespCommand.BRPOP:
var isSuccessful = TryGetNextListItem(listObj, command, out var nextItem);
isSuccessful = TryGetNextListItem(listObj, command, out var nextItem);
result = new CollectionItemResult(key, nextItem);
return isSuccessful;
break;
case RespCommand.BLMOVE:
var arrDstKey = dstKey.ToArray();
var dstStatusOp = storageSession.GET(arrDstKey, out var osDstObject, ref objectLockableContext);

ListObject dstList;
var newObj = false;
if (dstObj == null)

if (dstStatusOp != GarnetStatus.NOTFOUND)
{
dstList = new ListObject();
newObj = true;
var dstObj = osDstObject.GarnetObject;

if (dstObj == null)
{
dstList = new ListObject();
newObj = true;
}
else if (dstObj is ListObject tmpDstList)
{
dstList = tmpDstList;
}
else
{
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
return initial;
}
}
else if (dstObj is ListObject tmpDstList)
else
{
dstList = tmpDstList;
if (initial)
{
// Check string store for wrongtype errors on initial run.
dstStatusOp = storageSession.GET(dstKey, out ArgSlice _, ref lockableContext);
if (dstStatusOp != GarnetStatus.NOTFOUND)
{
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
return initial;
}
}

dstList = new ListObject();
newObj = true;
}
else return false;

isSuccessful = TryMoveNextListItem(listObj, dstList, (OperationDirection)cmdArgs[1].ReadOnlySpan[0],
(OperationDirection)cmdArgs[2].ReadOnlySpan[0], out nextItem);
Expand All @@ -527,8 +579,7 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
isSuccessful = storageSession.SET(arrDstKey, dstList, ref objectLockableContext) ==
GarnetStatus.OK;
}

return isSuccessful;
break;
case RespCommand.BLMPOP:
var popDirection = (OperationDirection)cmdArgs[0].ReadOnlySpan[0];
var popCount = *(int*)(cmdArgs[1].ptr);
Expand All @@ -537,25 +588,44 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
var items = new byte[popCount][];
for (var i = 0; i < popCount; i++)
{
var _ = TryGetNextListItem(listObj, popDirection == OperationDirection.Left ? RespCommand.BLPOP : RespCommand.BRPOP, out items[i]); // Return can be ignored because it is guaranteed to return true
// Return can be ignored because it is guaranteed to return true
_ = TryGetNextListItem(listObj, popDirection == OperationDirection.Left ? RespCommand.BLPOP : RespCommand.BRPOP, out items[i]);
}

result = new CollectionItemResult(key, items);
return true;
isSuccessful = true;
break;
default:
return false;
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
return initial;
}

if (isSuccessful && listObj.LnkList.Count == 0)
{
_ = storageSession.EXPIRE(asKey, TimeSpan.Zero, out _, StoreType.Object, ExpireOption.None,
ref lockableContext, ref objectLockableContext);
}
return isSuccessful;
case SortedSetObject setObj:
currCount = setObj.Count();
if (objectType != GarnetObjectType.SortedSet)
return false;
{
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
return initial;
}
if (currCount == 0)
return false;

return TryGetNextSetObjects(key, setObj, currCount, command, cmdArgs, out result);

isSuccessful = TryGetNextSetObjects(key, setObj, currCount, command, cmdArgs, out result);
if (isSuccessful && setObj.Count() == 0)
{
_ = storageSession.EXPIRE(asKey, TimeSpan.Zero, out _, StoreType.Object, ExpireOption.None,
ref lockableContext, ref objectLockableContext);
}
return isSuccessful;
default:
return false;
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
return initial;
}
}
finally
Expand Down
17 changes: 13 additions & 4 deletions libs/server/Objects/ItemBroker/CollectionItemResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,39 @@ namespace Garnet.server
/// </summary>
internal readonly struct CollectionItemResult
{
public CollectionItemResult(GarnetStatus status)
{
Status = status;
}

public CollectionItemResult(byte[] key, byte[] item)
{
Key = key;
Item = item;
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
}

public CollectionItemResult(byte[] key, byte[][] items)
{
Key = key;
Items = items;
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
}

public CollectionItemResult(byte[] key, double score, byte[] item)
{
Key = key;
Score = score;
Item = item;
Score = score;
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
}

public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
{
Key = key;
Scores = scores;
Items = items;
Scores = scores;
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
}

private CollectionItemResult(bool isForceUnblocked)
Expand All @@ -40,9 +49,9 @@ private CollectionItemResult(bool isForceUnblocked)
}

/// <summary>
/// True if item was found
/// Result status
/// </summary>
internal bool Found => Key != default;
internal GarnetStatus Status { get; }

/// <summary>
/// Key of collection from which item was retrieved
Expand Down
Loading