Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CLUSTER PUBSUB #866

Merged
merged 34 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d995b11
skip Interlocked.Exchange and FlushConfig when no update detected on …
vazois Jan 9, 2025
4afc0db
special case of zero epoch necessitates tracking of explicit slot update
vazois Jan 9, 2025
86cf1d4
move key spec extraction to clusterProvider
vazois Nov 27, 2024
957f03c
implement CLUSTER PUBLISH api
vazois Nov 27, 2024
82951d8
implement SSUBSCRIBE api
vazois Nov 20, 2024
e80c3f9
implement SPUBLISH api
vazois Nov 21, 2024
fae3ce4
implement separate code path for CLUSTER SPUBLISH
vazois Dec 5, 2024
54ae893
expose max number of outstanding pubsub forwarding tasks parameter
vazois Dec 5, 2024
0830b59
add API to forward published messages using GarnetClient
vazois Dec 6, 2024
824f614
add methods to extract nodeIds of all nodes in cluster and all nodes …
vazois Dec 6, 2024
bd744bf
hook up published message forwarding
vazois Dec 6, 2024
c7208bc
PUBLISH and SPUBLISH benchmark addition
vazois Dec 9, 2024
9db32e7
fix check for getting most recent config instance
vazois Dec 10, 2024
c9345aa
optimization 1: calculate candidates for published message forwarding…
vazois Dec 10, 2024
412ae03
optimization2: ignore response from cluster publish
vazois Dec 12, 2024
95db9b0
add publish BDN
vazois Dec 12, 2024
7816c5e
minimize task creation
vazois Dec 12, 2024
5ef16a3
load no response task
vazois Jan 6, 2025
8fc56d1
change to pinned array
vazois Jan 6, 2025
bd1e3c8
simplify config update for publish forward
vazois Jan 7, 2025
120edee
wip; wait for response vs fire and forget
vazois Jan 8, 2025
5788de0
add protected connection initialization for gossip-publish
vazois Jan 13, 2025
f42a1f4
remove BDN because not useful
vazois Jan 13, 2025
e5b48b6
remove unused NoResponse flag
vazois Jan 13, 2025
eecb867
remove option for tasks
vazois Jan 13, 2025
8026a30
ensure noResponse for slot verification of cluster commands
vazois Jan 13, 2025
8c17dd9
add SPUBLISH and PUBLISH docs
vazois Jan 17, 2025
4fa2ddd
correct typo
vazois Jan 23, 2025
240ba5e
merge main
vazois Jan 29, 2025
305eaf7
addressing comments v1
vazois Jan 29, 2025
bbddb19
addressing comments v2
vazois Jan 29, 2025
385cea9
rename new lock interface to indicate clear semantics
vazois Jan 30, 2025
4512032
merge from main
vazois Jan 30, 2025
8a556f6
Merge branch 'main' into vazois/cluster-pubsub
TalZaccai Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Cluster/ClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void CreateGetSet(int keySize = 8, int valueSize = 32, int batchSize = 10
benchUtils.RandomBytes(ref pairs[i].Item2);
}

var setByteCount = batchSize * ("*2\r\n$3\r\nSET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2 + 1 + NumUtils.CountDigits(valueSize) + 2 + valueSize + 2);
var setByteCount = batchSize * ("*3\r\n$3\r\nSET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2 + 1 + NumUtils.CountDigits(valueSize) + 2 + valueSize + 2);
var setReq = new Request(setByteCount);
var curr = setReq.ptr;
var end = curr + setReq.buffer.Length;
Expand Down
1 change: 1 addition & 0 deletions benchmark/Resp.benchmark/OpType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public enum OpType
DBSIZE,
READ_TXN, WRITE_TXN, READWRITETX, WATCH_TXN, SAMPLEUPDATETX, SAMPLEDELETETX,
SCRIPTSET, SCRIPTGET, SCRIPTRETKEY,
PUBLISH, SPUBLISH,
READONLY = 8888,
AUTH = 9999,
}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Resp.benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ static void RunBasicCommandsBenchmark(Options opts)
int keyLen = opts.KeyLength;
int valueLen = opts.ValueLength;

if (opts.Op == OpType.ZADD || opts.Op == OpType.ZREM || opts.Op == OpType.ZADDREM || opts.Op == OpType.PING || opts.Op == OpType.GEOADD || opts.Op == OpType.GEOADDREM || opts.Op == OpType.SETEX || opts.Op == OpType.ZCARD || opts.Op == OpType.ZADDCARD)
if (opts.Op == OpType.PUBLISH || opts.Op == OpType.SPUBLISH || opts.Op == OpType.ZADD || opts.Op == OpType.ZREM || opts.Op == OpType.ZADDREM || opts.Op == OpType.PING || opts.Op == OpType.GEOADD || opts.Op == OpType.GEOADDREM || opts.Op == OpType.SETEX || opts.Op == OpType.ZCARD || opts.Op == OpType.ZADDCARD)
opts.SkipLoad = true;

//if we have scripts ops we need to load them in memory
Expand Down
2 changes: 2 additions & 0 deletions benchmark/Resp.benchmark/ReqGen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ public static (int, int) OnResponse(byte* buf, int bytesRead, int opType)
case OpType.DEL:
case OpType.INCR:
case OpType.DBSIZE:
case OpType.PUBLISH:
case OpType.SPUBLISH:
for (int i = 0; i < bytesRead; i++)
if (buf[i] == ':') count++;
break;
Expand Down
4 changes: 4 additions & 0 deletions benchmark/Resp.benchmark/ReqGenLoadBuffers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ private bool GenerateBatch(int i, int start, int end, OpType opType)
OpType.SCRIPTSET => System.Text.Encoding.ASCII.GetBytes($"*5\r\n$7\r\nEVALSHA\r\n{BenchUtils.sha1SetScript}\r\n$1\r\n1\r\n"),
OpType.SCRIPTGET => System.Text.Encoding.ASCII.GetBytes($"*4\r\n$7\r\nEVALSHA\r\n{BenchUtils.sha1GetScript}\r\n$1\r\n1\r\n"),
OpType.SCRIPTRETKEY => System.Text.Encoding.ASCII.GetBytes($"*4\r\n$7\r\nEVALSHA\r\n{BenchUtils.sha1RetKeyScript}\r\n$1\r\n1\r\n"),
OpType.PUBLISH => System.Text.Encoding.ASCII.GetBytes($"*3\r\n$7\r\nPUBLISH\r\n"),
OpType.SPUBLISH => System.Text.Encoding.ASCII.GetBytes($"*3\r\n$8\r\nSPUBLISH\r\n"),
_ => null
};

Expand Down Expand Up @@ -174,6 +176,8 @@ private bool GenerateBatch(int i, int start, int end, OpType opType)
case OpType.SCRIPTSET:
case OpType.SCRIPTGET:
case OpType.SCRIPTRETKEY:
case OpType.PUBLISH:
case OpType.SPUBLISH:
writeSuccess = GenerateSingleKeyValueOp(i, opHeader, start, end, opType);
return writeSuccess;
default:
Expand Down
4 changes: 4 additions & 0 deletions benchmark/Resp.benchmark/ReqGenUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ private bool WriteOp(ref byte* curr, byte* vend, OpType opType)
case OpType.SCRIPTSET:
case OpType.SCRIPTGET:
case OpType.SCRIPTRETKEY:
case OpType.PUBLISH:
case OpType.SPUBLISH:
if (!WriteKey(ref curr, vend, out keyData))
return false;
break;
Expand Down Expand Up @@ -189,6 +191,8 @@ private bool WriteOp(ref byte* curr, byte* vend, OpType opType)
case OpType.MPFADD:
case OpType.SET:
case OpType.SCRIPTSET:
case OpType.PUBLISH:
case OpType.SPUBLISH:
RandomString();
if (!WriteStringBytes(ref curr, vend, valueBuffer))
return false;
Expand Down
21 changes: 20 additions & 1 deletion benchmark/Resp.benchmark/RespOnlineBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ public async void OpRunnerGarnetClientSession(int thread_id)
var op = SelectOpType(rand);
var startTimestamp = Stopwatch.GetTimestamp();
var c = opts.Pool ? await gcsPool.GetAsync() : client;

_ = op switch
{
OpType.PING => await c.ExecuteAsync(["PING"]),
Expand All @@ -605,6 +604,8 @@ public async void OpRunnerGarnetClientSession(int thread_id)
OpType.DEL => await c.ExecuteAsync(["DEL", req.GenerateKey()]),
OpType.SETBIT => await c.ExecuteAsync(["SETBIT", req.GenerateKey(), req.GenerateBitOffset()]),
OpType.GETBIT => await c.ExecuteAsync(["GETBIT", req.GenerateKey(), req.GenerateBitOffset()]),
OpType.PUBLISH => await c.ExecuteAsync(["PUBLISH", req.GenerateKey(), req.GenerateValue()]),
OpType.SPUBLISH => await c.ExecuteAsync(["SPUBLISH", req.GenerateKey(), req.GenerateValue()]),
OpType.ZADD => await ZADD(),
OpType.ZREM => await ZREM(),
OpType.ZCARD => await ZCARD(),
Expand Down Expand Up @@ -717,6 +718,12 @@ public async void OpRunnerGarnetClientSessionParallel(int thread_id, int paralle
case OpType.GETBIT:
c.ExecuteBatch(["GETBIT", req.GenerateKey(), req.GenerateBitOffset()]);
break;
case OpType.PUBLISH:
c.ExecuteBatch(["PUBLISH", req.GenerateKey(), req.GenerateValue()]);
break;
case OpType.SPUBLISH:
c.ExecuteBatch(["SPUBLISH", req.GenerateKey(), req.GenerateValue()]);
break;

default:
throw new Exception($"opType: {op} benchmark not supported with {opts.Client} ClientType!");
Expand Down Expand Up @@ -1046,6 +1053,12 @@ public async void OpRunnerSERedis(int thread_id)
case OpType.DEL:
await db.KeyDeleteAsync(req.GenerateKey());
break;
case OpType.PUBLISH:
await db.PublishAsync(RedisChannel.Literal(req.GenerateKey()), req.GenerateValue());
break;
case OpType.SPUBLISH:
await db.ExecuteAsync("SPUBLISH", req.GenerateKey(), req.GenerateValue());
break;
case OpType.ZADD:
{
var key = req.GenerateKey();
Expand Down Expand Up @@ -1121,6 +1134,12 @@ public async void OpRunnerSERedisParallel(int thread_id, int parallel)
case OpType.SET:
tasks[offset++] = db.StringSetAsync(req.GenerateKey(), req.GenerateValue());
break;
case OpType.PUBLISH:
tasks[offset++] = db.PublishAsync(RedisChannel.Literal(req.GenerateKey()), req.GenerateValue());
break;
case OpType.SPUBLISH:
tasks[offset++] = db.ExecuteAsync("SPUBLISH", req.GenerateKey(), req.GenerateValue());
break;
case OpType.SETEX:
tasks[offset++] = db.StringSetAsync(req.GenerateKey(), req.GenerateValue(), TimeSpan.FromSeconds(opts.Ttl));
break;
Expand Down
Loading