Skip to content

Commit

Permalink
Fix large sized pubsub message transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Nov 12, 2024
1 parent 652b49f commit a3f2900
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 7 deletions.
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ private void WriteDirectLarge(ReadOnlySpan<byte> src)

// Adjust number of bytes to copy, to space left on output buffer, then copy
src.Slice(0, destSpace).CopyTo(new Span<byte>(dcurr, destSpace));
dcurr += destSpace;

Check failure

Code scanning / CodeQL

Unvalidated local pointer arithmetic Critical

Unvalidated pointer arithmetic from virtual method
GetResponseObjectHead
.
src = src.Slice(destSpace);

// Send and reset output buffer
Expand Down
36 changes: 30 additions & 6 deletions test/Garnet.test/RespPubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

using System;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Channels;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using StackExchange.Redis;
Expand All @@ -20,7 +20,7 @@ class RespPubSubTests
public void Setup()
{
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, pubSubPageSize: "256k");
server.Start();
}

Expand Down Expand Up @@ -52,6 +52,27 @@ public void BasicSUBSCRIBE()
sub.Unsubscribe(RedisChannel.Literal("messages"));
}

[Test]
public void LargeSUBSCRIBE()
{
using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var sub = subRedis.GetSubscriber();
var db = redis.GetDatabase(0);
RedisValue value = RandomNumberGenerator.GetBytes(140 * 1024);

ManualResetEvent evt = new(false);

SubscribeAndPublish(sub, db, RedisChannel.Literal("messages"), RedisChannel.Literal("messages"), value, onSubscribe: (channel, message) =>
{
ClassicAssert.AreEqual("messages", (string)channel);
ClassicAssert.AreEqual(value, (string)message);
evt.Set();
});

sub.Unsubscribe(RedisChannel.Literal("messages"));
}

[Test]
public void BasicPSUBSCRIBE()
{
Expand Down Expand Up @@ -180,9 +201,12 @@ public void BasicPUBSUB_NUMSUB()
sub.Unsubscribe(RedisChannel.Literal("messagesB"));
}

private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, string message = null, Action<RedisChannel, RedisValue> onSubscribe = null)
private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, RedisValue? message = null, Action<RedisChannel, RedisValue> onSubscribe = null)
{
message ??= "published message";
if (!message.HasValue)
{
message = "published message";
}
publishChannel ??= channel;
ManualResetEvent evt = new(false);
sub.Subscribe(channel, (receivedChannel, receivedMessage) =>
Expand All @@ -197,8 +221,8 @@ private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel cha
int repeat = 5;
while (true)
{
db.Publish(publishChannel.Value, message);
var ret = evt.WaitOne(TimeSpan.FromSeconds(1));
db.Publish(publishChannel.Value, message.Value);
var ret = evt.WaitOne(TimeSpan.FromSeconds(10));
if (ret) break;
repeat--;
ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subscription receive");
Expand Down
6 changes: 5 additions & 1 deletion test/Garnet.test/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public static GarnetServer CreateGarnetServer(
IAuthenticationSettings authenticationSettings = null,
bool enableLua = false,
ILogger logger = null,
IEnumerable<string> loadModulePaths = null)
IEnumerable<string> loadModulePaths = null,
string pubSubPageSize = null)
{
if (UseAzureStorage)
IgnoreIfNotRunningAzureTests();
Expand Down Expand Up @@ -280,6 +281,9 @@ public static GarnetServer CreateGarnetServer(
LoadModuleCS = loadModulePaths
};

if (!string.IsNullOrEmpty(pubSubPageSize))
opts.PubSubPageSize = pubSubPageSize;

if (!string.IsNullOrEmpty(objectStoreHeapMemorySize))
opts.ObjectStoreHeapMemorySize = objectStoreHeapMemorySize;

Expand Down

0 comments on commit a3f2900

Please sign in to comment.