Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async RESP responses to pending GET calls (due to disk read) #387

Merged
merged 64 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
cd3a78e
Initial rough prototype checkin
badrishc May 15, 2024
53f3cc7
updates to logic
badrishc May 15, 2024
03b7a9c
nits
badrishc May 15, 2024
42adfe6
undo error checkin
badrishc May 15, 2024
beaa156
Merge from main
badrishc May 15, 2024
b7e7358
clean up
badrishc May 15, 2024
68d7069
nit
badrishc May 15, 2024
8c249f6
update comment for spinLock
badrishc May 15, 2024
a42fd2c
fix formatting
badrishc May 15, 2024
f05e903
add comments
badrishc May 16, 2024
c9125b1
Merge branch 'main' into badrishc/async-resp
badrishc May 16, 2024
6e56181
Merge remote-tracking branch 'origin/main' into badrishc/async-resp
badrishc May 17, 2024
ba6e969
Basic implementation of HELLO command
badrishc May 18, 2024
1757058
fix
badrishc May 18, 2024
bbae2d2
Added command info for HELLO + added website docs
TalZaccai May 18, 2024
e13b377
dotnet format
TalZaccai May 18, 2024
3c29a03
update SE.Redis to latest, add HELLO unit test
badrishc May 18, 2024
a33a912
Add and use EqualsIgnoreCase
badrishc May 20, 2024
8f5266e
use nameof(RespCommand.HELLO)
badrishc May 20, 2024
8163b20
updates
badrishc May 20, 2024
25f3960
support special pong in RESP2 subscribe
badrishc May 20, 2024
8d72994
Merge branch 'main' into badrishc/hello
badrishc May 20, 2024
2a8d1ba
only support RESP2 in this PR
badrishc May 20, 2024
5c5dd09
remove resp3 testcase
badrishc May 20, 2024
e512637
fix warnings
badrishc May 20, 2024
f2cb070
fix warning
badrishc May 20, 2024
b6302a1
make EqualsIgnoreCase extension method in utils
badrishc May 20, 2024
2b35875
Merge from HELLO
badrishc May 20, 2024
c3d17c7
updates
badrishc May 20, 2024
88bdb6d
improve the case-insensitive match test
badrishc May 20, 2024
9838e6a
Merge branch 'badrishc/hello' into badrishc/async-resp
badrishc May 20, 2024
cc52900
nit
badrishc May 20, 2024
f9671a9
Merge branch 'badrishc/hello' into badrishc/async-resp
badrishc May 20, 2024
8221abe
support push subscribe
badrishc May 20, 2024
94a8d79
update version
badrishc May 20, 2024
766219f
fix lowest flag
badrishc May 20, 2024
03cbf62
MErge from main
badrishc May 21, 2024
c74a4b7
updates
badrishc May 21, 2024
1c0a840
fix info commands
badrishc May 21, 2024
3617cfc
Merge branch 'main' into badrishc/async-resp
badrishc May 21, 2024
e97cc08
nit
badrishc May 21, 2024
ac3e9dd
ensure full input received before processing array commands, if useAs…
badrishc May 21, 2024
95f183c
async only works on RESP3 or later
badrishc May 21, 2024
61d32c6
change respProtocolVersion to byte
badrishc May 21, 2024
702117f
add comments
badrishc May 21, 2024
5329db0
nits
badrishc May 21, 2024
8b5104a
Merge branch 'main' into badrishc/async-resp
badrishc May 21, 2024
c69a864
protocol change is not allowed with pending async operations
badrishc May 22, 2024
53aa3dc
Merge branch 'badrishc/async-resp' of github.com:microsoft/garnet int…
badrishc May 22, 2024
6b94498
Merge branch 'main' into badrishc/async-resp
badrishc May 22, 2024
67d0614
optimize sync case
badrishc May 22, 2024
ddf30b8
Merge branch 'main' into badrishc/async-resp
badrishc May 22, 2024
7d61158
fix format
badrishc May 22, 2024
2b3887d
Merge branch 'badrishc/async-resp' of https://github.com/microsoft/ga…
badrishc May 22, 2024
32188c2
Merge branch 'main' into badrishc/async-resp
badrishc May 22, 2024
5f514ba
fix push response header for async
badrishc May 22, 2024
244fe03
Merge remote-tracking branch 'origin/main' into badrishc/async-resp
badrishc May 23, 2024
6cd0ac2
Merge branch 'main' into badrishc/async-resp
badrishc May 28, 2024
8fb3d72
fix asyncStarted increment
badrishc May 28, 2024
8fa8545
Merge branch 'main' into badrishc/async-resp
badrishc May 29, 2024
59b4ab9
Added unit test, minor fixes
badrishc May 29, 2024
39be321
Cancel and signal to ensure the async processor ends
badrishc May 29, 2024
37e7324
Merge from main
badrishc May 29, 2024
c2ee0a2
nit
badrishc May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions libs/common/Networking/GarnetTcpNetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public class GarnetTcpNetworkSender : NetworkSenderBase

readonly LimitedFixedBufferPool networkPool;

/// <summary>
/// NOTE: This variable should not be marked as readonly as it is a mutable struct
/// </summary>
SpinLock spinLock;
badrishc marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
///
/// </summary>
Expand All @@ -67,6 +72,7 @@ public GarnetTcpNetworkSender(
this.saeaStack = new(2 * ThrottleMax);
this.responseObject = null;
this.ThrottleMax = throttleMax;
this.spinLock = new();

var endpoint = socket.RemoteEndPoint as IPEndPoint;
if (endpoint != null)
Expand All @@ -79,6 +85,49 @@ public GarnetTcpNetworkSender(
/// <inheritdoc />
public override string RemoteEndpointName => remoteEndpoint;

/// <inheritdoc />
public override void Enter()
{
var lockTaken = false;
spinLock.Enter(ref lockTaken);
Debug.Assert(lockTaken);
}

/// <inheritdoc />
public override unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail)
{
var lockTaken = false;
spinLock.Enter(ref lockTaken);
Debug.Assert(lockTaken);
Debug.Assert(responseObject == null);
if (!saeaStack.TryPop(out responseObject, out bool disposed))
{
if (disposed)
ThrowDisposed();
responseObject = new GarnetSaeaBuffer(SeaaBuffer_Completed, networkPool);
}
head = responseObject.buffer.entryPtr;
tail = responseObject.buffer.entryPtr + responseObject.buffer.entry.Length;
}

/// <inheritdoc />
public override void Exit()
{
spinLock.Exit();
}

/// <inheritdoc />
public override unsafe void ExitAndReturnResponseObject()
{
if (responseObject != null)
{
ReturnBuffer(responseObject);
responseObject = null;
}
spinLock.Exit();
}


/// <inheritdoc />
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override void GetResponseObject()
Expand Down
22 changes: 22 additions & 0 deletions libs/common/Networking/INetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ public interface INetworkSender : IDisposable
/// </summary>
string RemoteEndpointName { get; }

/// <summary>
/// Enter exclusive use of network sender.
/// </summary>
void Enter();

/// <summary>
/// Enter exclusive use of network sender. Allocate and get response object pointers.
/// </summary>
/// <param name="head"></param>
/// <param name="tail"></param>
unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail);

/// <summary>
/// Exit exclusive use of network sender.
/// </summary>
void Exit();

/// <summary>
/// Exit exclusive use of network sender. Free response object.
/// </summary>
void ExitAndReturnResponseObject();

/// <summary>
/// Allocate a new response object
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions libs/common/Networking/NetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,28 @@ unsafe void ShiftTransportReceiveBuffer()
}
}

/// <inheritdoc />
public override void Enter()
=> networkSender.Enter();

/// <inheritdoc />
public override unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail)
{
networkSender.Enter();
head = transportSendBufferPtr;
tail = transportSendBufferPtr + transportSendBuffer.Length;
}

/// <inheritdoc />
public override void Exit()
=> networkSender.Exit();

/// <inheritdoc />
public override void ExitAndReturnResponseObject()
{
networkSender.Exit();
}

/// <inheritdoc />
public override void GetResponseObject() { }

Expand Down
12 changes: 12 additions & 0 deletions libs/common/Networking/NetworkSenderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ public NetworkSenderBase(int serverBufferSize)
/// <inheritdoc />
public abstract string RemoteEndpointName { get; }

/// <inheritdoc />
public abstract void Enter();

/// <inheritdoc />
public abstract unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail);

/// <inheritdoc />
public abstract void Exit();

/// <inheritdoc />
public abstract void ExitAndReturnResponseObject();

/// <inheritdoc />
public abstract void GetResponseObject();

Expand Down
25 changes: 25 additions & 0 deletions libs/common/RespReadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,31 @@ public static bool ReadULongWithLengthHeader(out ulong number, ref byte* ptr, by
return true;
}

/// <summary>
/// Skip byte array with length header
/// </summary>
public static bool SkipByteArrayWithLengthHeader(ref byte* ptr, byte* end)
{
// Parse RESP string header
if (!ReadLengthHeader(out var length, ref ptr, end))
return false;

// Advance read pointer to the end of the array (including terminator)
var keyPtr = ptr;

ptr += length + 2;

if (ptr > end)
return false;

// Ensure terminator has been received
if (*(ushort*)(ptr - 2) != MemoryMarshal.Read<ushort>("\r\n"u8))
{
RespParsingException.ThrowUnexpectedToken(*(ptr - 2));
}
return true;
}

/// <summary>
/// Read byte array with length header
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions libs/common/RespWriteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ public static bool WriteMapLength(int len, ref byte* curr, byte* end)
return true;
}

/// <summary>
/// Write push type length
/// </summary>
public static bool WritePushLength(int len, ref byte* curr, byte* end)
{
int numDigits = NumUtils.NumDigits(len);
int totalLen = 1 + numDigits + 2;
if (totalLen > (int)(end - curr))
return false;
*curr++ = (byte)'>';
NumUtils.IntToBytes(len, numDigits, ref curr);
WriteNewline(ref curr);
return true;
}

/// <summary>
/// Write array length
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class GarnetServer : IDisposable
/// <summary>
/// Resp protocol version
/// </summary>
readonly string redisProtocolVersion = "6.2.11";
readonly string redisProtocolVersion = "7.2.5";

/// <summary>
/// Metrics API
Expand Down
3 changes: 3 additions & 0 deletions libs/server/API/GarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public GarnetStatus GET_WithPending(ref SpanByte key, ref SpanByte input, ref Sp
public bool GET_CompletePending((GarnetStatus, SpanByteAndMemory)[] outputArr, bool wait = false)
=> storageSession.GET_CompletePending(outputArr, wait, ref context);

public bool GET_CompletePending(out CompletedOutputIterator<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long> completedOutputs, bool wait)
=> storageSession.GET_CompletePending(out completedOutputs, wait, ref context);

/// <inheritdoc />
public unsafe GarnetStatus GETForMemoryResult(ArgSlice key, out MemoryResult<byte> value)
=> storageSession.GET(key, out value, ref context);
Expand Down
8 changes: 8 additions & 0 deletions libs/server/API/IGarnetAdvancedApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ public interface IGarnetAdvancedApi
/// <param name="wait"></param>
bool GET_CompletePending((GarnetStatus, SpanByteAndMemory)[] outputArr, bool wait = false);

/// <summary>
/// Complete pending read operations on main store
/// </summary>
/// <param name="completedOutputs"></param>
/// <param name="wait"></param>
/// <returns></returns>
bool GET_CompletePending(out CompletedOutputIterator<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long> completedOutputs, bool wait = false);
badrishc marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// RMW operation on main store
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/InputHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public struct RespInputHeader
/// Size of header
/// </summary>
public const int Size = 2;
internal const byte FlagMask = (byte)RespInputFlags.Deterministic - 1;
internal const byte FlagMask = (byte)RespInputFlags.SetGet - 1;

[FieldOffset(0)]
internal RespCommand cmd;
Expand Down
87 changes: 81 additions & 6 deletions libs/server/Resp/AdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.server.ACL;
Expand Down Expand Up @@ -280,7 +281,7 @@ private bool ProcessAdminCommands<TGarnetApi>(RespCommand command, ReadOnlySpan<
}
else if (command == RespCommand.HELLO)
{
int? respProtocolVersion = null;
byte? respProtocolVersion = null;
ReadOnlySpan<byte> authUsername = default, authPassword = default;
string clientName = null;

Expand All @@ -292,7 +293,7 @@ private bool ProcessAdminCommands<TGarnetApi>(RespCommand command, ReadOnlySpan<
return false;
readHead = (int)(ptr - recvBufferPtr);

respProtocolVersion = localRespProtocolVersion;
respProtocolVersion = (byte)localRespProtocolVersion;
count--;
while (count > 0)
{
Expand Down Expand Up @@ -542,13 +543,77 @@ private bool ProcessAdminCommands<TGarnetApi>(RespCommand command, ReadOnlySpan<
{
return ProcessACLCommands(bufSpan, count);
}
else if ((command == RespCommand.REGISTERCS))
else if (command == RespCommand.REGISTERCS)
{
return NetworkREGISTERCS(count, recvBufferPtr + readHead, storeWrapper.customCommandManager);
}
else if (command == RespCommand.ASYNC)
{
if (respProtocolVersion > 2 && count == 1)
badrishc marked this conversation as resolved.
Show resolved Hide resolved
{
var param = GetCommand(bufSpan, out bool success1);
if (!success1) return false;
if (param.SequenceEqual(CmdStrings.ON) || param.SequenceEqual(CmdStrings.on))
{
useAsync = true;
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
else if (param.SequenceEqual(CmdStrings.OFF) || param.SequenceEqual(CmdStrings.off))
{
badrishc marked this conversation as resolved.
Show resolved Hide resolved
useAsync = false;
while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
else if (param.SequenceEqual(CmdStrings.BARRIER) || param.SequenceEqual(CmdStrings.barrier))
{
if (asyncCompleted < asyncStarted)
{
asyncDone = new(0);
if (dcurr > networkSender.GetResponseObjectHead())
Send(networkSender.GetResponseObjectHead());
try
{
networkSender.ExitAndReturnResponseObject();
while (asyncCompleted < asyncStarted) asyncDone.Wait();
asyncDone.Dispose();
asyncDone = null;
}
finally
{
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
}
}

while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
SendAndReset();
}
else
{
if (!DrainCommands(bufSpan, count - 1))
return false;
errorFlag = true;
errorCmd = "ASYNC";
}
}
else
{
if (!DrainCommands(bufSpan, count))
return false;
if (respProtocolVersion <= 2)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_NOT_SUPPORTED_RESP2, ref dcurr, dend))
SendAndReset();
}
else
{
errorFlag = true;
errorCmd = "ASYNC";
}
}
}
else
{
// Unknown RESP Command
if (!DrainCommands(bufSpan, count))
return false;
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_UNK_CMD, ref dcurr, dend))
Expand All @@ -564,17 +629,27 @@ private bool ProcessAdminCommands<TGarnetApi>(RespCommand command, ReadOnlySpan<
return true;
}

void ProcessHelloCommand(int? respProtocolVersion, ReadOnlySpan<byte> username, ReadOnlySpan<byte> password, string clientName)
/// <summary>
/// Process the HELLO command
/// </summary>
void ProcessHelloCommand(byte? respProtocolVersion, ReadOnlySpan<byte> username, ReadOnlySpan<byte> password, string clientName)
{
if (respProtocolVersion != null)
{
if (respProtocolVersion.Value != 2)
if (respProtocolVersion.Value is < 2 or > 3)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_UNSUPPORTED_PROTOCOL_VERSION, ref dcurr, dend))
SendAndReset();
return;
}

if (respProtocolVersion.Value != this.respProtocolVersion && asyncCompleted < asyncStarted)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_ASYNC_PROTOCOL_CHANGE, ref dcurr, dend))
SendAndReset();
return;
}

this.respProtocolVersion = respProtocolVersion.Value;
}

Expand Down
Loading