Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix large sized pubsub message transfer #791

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
######################################
# NOTE: Before running this pipeline to generate a new nuget package, update the version string in two places
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 1) update the name: string below (line 7) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 32) -- NOTE - these two values need to be the same
# 3) update the version in GarnetServer.csproj (~line 8)
######################################
name: 1.0.36
name: 1.0.37
trigger:
branches:
include:
Expand Down
4 changes: 2 additions & 2 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace Garnet
/// </summary>
public class GarnetServer : IDisposable
{
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
readonly string version = "1.0.36";
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.csproj line ~8.
readonly string version = "1.0.37";

internal GarnetProvider Provider;

Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ private void WriteDirectLarge(ReadOnlySpan<byte> src)

// Adjust number of bytes to copy, to space left on output buffer, then copy
src.Slice(0, destSpace).CopyTo(new Span<byte>(dcurr, destSpace));
dcurr += destSpace;
Dismissed Show dismissed Hide dismissed
src = src.Slice(destSpace);

// Send and reset output buffer
Expand Down
4 changes: 2 additions & 2 deletions main/GarnetServer/GarnetServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<OutputType>Exe</OutputType>
<ServerGarbageCollection>true</ServerGarbageCollection>

<!-- IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6. -->
<Version>1.0.36</Version>
<!-- IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.cs line ~32. -->
<Version>1.0.37</Version>
<PackageId>garnet-server</PackageId>
<PackAsTool>true</PackAsTool>
<ToolCommandName>garnet-server</ToolCommandName>
Expand Down
34 changes: 29 additions & 5 deletions test/Garnet.test/RespPubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

using System;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Channels;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using StackExchange.Redis;
Expand All @@ -20,7 +20,7 @@ class RespPubSubTests
public void Setup()
{
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, pubSubPageSize: "256k");
server.Start();
}

Expand Down Expand Up @@ -52,6 +52,27 @@ public void BasicSUBSCRIBE()
sub.Unsubscribe(RedisChannel.Literal("messages"));
}

[Test]
public void LargeSUBSCRIBE()
{
using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var sub = subRedis.GetSubscriber();
var db = redis.GetDatabase(0);
RedisValue value = RandomNumberGenerator.GetBytes(140 * 1024);

ManualResetEvent evt = new(false);

SubscribeAndPublish(sub, db, RedisChannel.Literal("messages"), RedisChannel.Literal("messages"), value, onSubscribe: (channel, message) =>
{
ClassicAssert.AreEqual("messages", (string)channel);
ClassicAssert.AreEqual(value, (string)message);
evt.Set();
});

sub.Unsubscribe(RedisChannel.Literal("messages"));
}

[Test]
public void BasicPSUBSCRIBE()
{
Expand Down Expand Up @@ -180,9 +201,12 @@ public void BasicPUBSUB_NUMSUB()
sub.Unsubscribe(RedisChannel.Literal("messagesB"));
}

private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, string message = null, Action<RedisChannel, RedisValue> onSubscribe = null)
private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, RedisValue? message = null, Action<RedisChannel, RedisValue> onSubscribe = null)
{
message ??= "published message";
if (!message.HasValue)
{
message = "published message";
}
publishChannel ??= channel;
ManualResetEvent evt = new(false);
sub.Subscribe(channel, (receivedChannel, receivedMessage) =>
Expand All @@ -197,7 +221,7 @@ private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel cha
int repeat = 5;
while (true)
{
db.Publish(publishChannel.Value, message);
db.Publish(publishChannel.Value, message.Value);
var ret = evt.WaitOne(TimeSpan.FromSeconds(1));
if (ret) break;
repeat--;
Expand Down
6 changes: 5 additions & 1 deletion test/Garnet.test/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public static GarnetServer CreateGarnetServer(
IAuthenticationSettings authenticationSettings = null,
bool enableLua = false,
ILogger logger = null,
IEnumerable<string> loadModulePaths = null)
IEnumerable<string> loadModulePaths = null,
string pubSubPageSize = null)
{
if (UseAzureStorage)
IgnoreIfNotRunningAzureTests();
Expand Down Expand Up @@ -280,6 +281,9 @@ public static GarnetServer CreateGarnetServer(
LoadModuleCS = loadModulePaths
};

if (!string.IsNullOrEmpty(pubSubPageSize))
opts.PubSubPageSize = pubSubPageSize;

if (!string.IsNullOrEmpty(objectStoreHeapMemorySize))
opts.ObjectStoreHeapMemorySize = objectStoreHeapMemorySize;

Expand Down