Skip to content

Commit 410fac4

Browse files
authored
Transaction commands (#137)
Signed-off-by: Alex Rehnby-Martin <[email protected]>
1 parent c29750d commit 410fac4

File tree

9 files changed

+316
-10
lines changed

9 files changed

+316
-10
lines changed

sources/Valkey.Glide/Abstract/Condition.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ public override string ToString()
450450

451451
internal override List<ICmd> CreateCommands()
452452
=> [
453-
Request.CustomCommand(["WATCH", key]),
453+
Request.Watch([key]),
454454
Request.CustomCommand([cmd.ToString(), key, expectedValue]),
455455
];
456456

@@ -495,7 +495,7 @@ public override string ToString()
495495

496496
internal sealed override List<ICmd> CreateCommands()
497497
=> [
498-
Request.CustomCommand(["WATCH", key]),
498+
Request.Watch([key]),
499499
Request.CustomCommand(cmd == ValkeyCommand.GET
500500
? [cmd.ToString(), key]
501501
: [cmd.ToString(), key, memberName]
@@ -540,7 +540,7 @@ public override string ToString()
540540

541541
internal sealed override List<ICmd> CreateCommands()
542542
=> [
543-
Request.CustomCommand(["WATCH", key]),
543+
Request.Watch([key]),
544544
Request.CustomCommand([ValkeyCommand.LINDEX.ToString(), key, index.ToString()]),
545545
];
546546

@@ -588,7 +588,7 @@ private string GetComparisonString()
588588

589589
internal sealed override List<ICmd> CreateCommands()
590590
=> [
591-
Request.CustomCommand(["WATCH", key]),
591+
Request.Watch([key]),
592592
Request.CustomCommand([cmd.ToString(), key]),
593593
];
594594

@@ -624,7 +624,7 @@ private string GetComparisonString()
624624

625625
internal sealed override List<ICmd> CreateCommands()
626626
=> [
627-
Request.CustomCommand(["WATCH", key]),
627+
Request.Watch([key]),
628628
Request.CustomCommand(["ZCOUNT", key, min, max]),
629629
];
630630

@@ -658,7 +658,7 @@ public override string ToString()
658658

659659
internal sealed override List<ICmd> CreateCommands()
660660
=> [
661-
Request.CustomCommand(["WATCH", key]),
661+
Request.Watch([key]),
662662
Request.CustomCommand(["ZCOUNT", key, sortedSetScore, sortedSetScore]),
663663
];
664664

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide.Commands;
4+
5+
/// <summary>
6+
/// Supports commands for the "Transaction Commands" group for standalone and cluster clients.
7+
/// <br />
8+
/// See more on <see href="https://valkey.io/commands/?group=transactions">valkey.io</see>.
9+
/// </summary>
10+
public interface ITransactionBaseCommands
11+
{
12+
/// <summary>
13+
/// Marks the given keys to be watched for conditional execution of a transaction. Transactions
14+
/// will only execute commands if the watched keys are not modified before execution of the
15+
/// transaction. Keys that do not exist are watched as if they were empty.
16+
/// </summary>
17+
/// <param name="keys">The keys to watch.</param>
18+
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
19+
/// <exception cref="Errors.RequestException">Thrown if the command fails to execute on the server.</exception>
20+
/// <remarks>
21+
/// <para>
22+
/// In cluster mode, if keys in <paramref name="keys"/> map to different hash slots, the command
23+
/// will be split across these slots and executed separately for each. This means the command
24+
/// is atomic only at the slot level. If one or more slot-specific requests fail, the entire
25+
/// call will return the first encountered error, even though some requests may have succeeded
26+
/// while others did not. If this behavior impacts your application logic, consider splitting
27+
/// the request into sub-requests per slot to ensure atomicity.
28+
/// </para>
29+
/// <example>
30+
/// <code>
31+
/// await client.WatchAsync(["sampleKey"]);
32+
///
33+
/// // Execute transaction
34+
/// var batch = new Batch(true)
35+
/// .StringSetAsync("sampleKey", "foobar");
36+
/// object[] transactionResult = await client.Exec(batch, false);
37+
/// // transactionResult is not null if transaction executed successfully
38+
///
39+
/// // Watch key again
40+
/// await client.WatchAsync(["sampleKey"]);
41+
/// var batch2 = new Batch(true)
42+
/// .StringSetAsync("sampleKey", "foobar");
43+
/// // Modify the watched key from another client/connection
44+
/// await client.StringSetAsync("sampleKey", "hello world");
45+
/// object[] transactionResult2 = await client.Exec(batch2, true);
46+
/// // transactionResult2 is null because the watched key was modified
47+
/// </code>
48+
/// </example>
49+
/// </remarks>
50+
/// <seealso href="https://valkey.io/commands/watch/"/>
51+
Task WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None);
52+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide.Commands;
4+
5+
/// <summary>
6+
/// Supports commands for the "Transaction Commands" group for cluster clients.
7+
/// <br />
8+
/// See more on <see href="https://valkey.io/commands/?group=transactions">valkey.io</see>.
9+
/// </summary>
10+
public interface ITransactionClusterCommands : ITransactionBaseCommands
11+
{
12+
/// <summary>
13+
/// Flushes all the previously watched keys for a transaction. Executing a transaction will
14+
/// automatically flush all previously watched keys.
15+
/// The command will be routed to all primary nodes.
16+
/// </summary>
17+
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
18+
/// <exception cref="Errors.RequestException">Thrown if the command fails to execute on the server.</exception>
19+
/// <example>
20+
/// <code>
21+
/// await client.WatchAsync(["sampleKey"]);
22+
/// await client.UnwatchAsync();
23+
/// // "sampleKey" is no longer watched on all primary nodes
24+
/// </code>
25+
/// </example>
26+
/// <seealso href="https://valkey.io/commands/unwatch/"/>
27+
Task UnwatchAsync(CommandFlags flags = CommandFlags.None);
28+
29+
/// <summary>
30+
/// Flushes all the previously watched keys for a transaction. Executing a transaction will
31+
/// automatically flush all previously watched keys.
32+
/// </summary>
33+
/// <param name="route">Specifies the routing configuration for the command. The client will route the
34+
/// command to the nodes defined by <paramref name="route"/>.</param>
35+
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
36+
/// <exception cref="Errors.RequestException">Thrown if the command fails to execute on the server.</exception>
37+
/// <example>
38+
/// <code>
39+
/// await client.WatchAsync(["sampleKey"]);
40+
/// await client.UnwatchAsync(Route.AllPrimaries);
41+
/// // "sampleKey" is no longer watched on all primary nodes
42+
/// </code>
43+
/// </example>
44+
/// <seealso href="https://valkey.io/commands/unwatch/"/>
45+
Task UnwatchAsync(Route route, CommandFlags flags = CommandFlags.None);
46+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide.Commands;
4+
5+
/// <summary>
6+
/// Supports commands for the "Transaction Commands" group for standalone clients.
7+
/// <br />
8+
/// See more on <see href="https://valkey.io/commands/?group=transactions">valkey.io</see>.
9+
/// </summary>
10+
public interface ITransactionCommands : ITransactionBaseCommands
11+
{
12+
/// <summary>
13+
/// Flushes all the previously watched keys for a transaction. Executing a transaction will
14+
/// automatically flush all previously watched keys.
15+
/// </summary>
16+
/// <param name="flags">The flags to use for this operation. Currently flags are ignored.</param>
17+
/// <exception cref="Errors.RequestException">Thrown if the command fails to execute on the server.</exception>
18+
/// <example>
19+
/// <code>
20+
/// await client.WatchAsync(["sampleKey"]);
21+
/// bool result = await client.UnwatchAsync();
22+
/// </code>
23+
/// </example>
24+
/// <seealso href="https://valkey.io/commands/unwatch/"/>
25+
Task UnwatchAsync(CommandFlags flags = CommandFlags.None);
26+
}

sources/Valkey.Glide/GlideClient.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Valkey.Glide;
1414
/// <summary>
1515
/// Client used for connection to standalone servers. Use <see cref="CreateClient"/> to request a client.
1616
/// </summary>
17-
public partial class GlideClient : BaseClient, IGenericCommands, IServerManagementCommands, IConnectionManagementCommands
17+
public partial class GlideClient : BaseClient, IGenericCommands, IServerManagementCommands, IConnectionManagementCommands, ITransactionCommands
1818
{
1919
internal GlideClient() { }
2020

@@ -202,6 +202,18 @@ public async IAsyncEnumerable<ValkeyKey> KeysAsync(int database = -1, ValkeyValu
202202
public async Task<(string cursor, ValkeyKey[] keys)> ScanAsync(string cursor, ScanOptions? options = null)
203203
=> await Command(Request.ScanAsync(cursor, options));
204204

205+
public async Task WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None)
206+
{
207+
GuardClauses.ThrowIfCommandFlags(flags);
208+
_ = await Command(Request.Watch(keys));
209+
}
210+
211+
public async Task UnwatchAsync(CommandFlags flags = CommandFlags.None)
212+
{
213+
GuardClauses.ThrowIfCommandFlags(flags);
214+
_ = await Command(Request.Unwatch());
215+
}
216+
205217
protected override async Task<Version> GetServerVersionAsync()
206218
{
207219
if (_serverVersion == null)

sources/Valkey.Glide/GlideClusterClient.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace Valkey.Glide;
2020
/// <summary>
2121
/// Client used for connection to cluster servers. Use <see cref="CreateClient"/> to request a client.
2222
/// </summary>
23-
public sealed partial class GlideClusterClient : BaseClient, IGenericClusterCommands, IServerManagementClusterCommands, IConnectionManagementClusterCommands
23+
public sealed partial class GlideClusterClient : BaseClient, IGenericClusterCommands, IServerManagementClusterCommands, IConnectionManagementClusterCommands, ITransactionClusterCommands
2424
{
2525
private GlideClusterClient() { }
2626

@@ -305,6 +305,24 @@ public async Task<string> SelectAsync(long index, CommandFlags flags = CommandFl
305305
return await Command(Request.Select(index), Route.Random);
306306
}
307307

308+
public async Task WatchAsync(ValkeyKey[] keys, CommandFlags flags = CommandFlags.None)
309+
{
310+
GuardClauses.ThrowIfCommandFlags(flags);
311+
_ = await Command(Request.Watch(keys));
312+
}
313+
314+
public async Task UnwatchAsync(CommandFlags flags = CommandFlags.None)
315+
{
316+
GuardClauses.ThrowIfCommandFlags(flags);
317+
_ = await Command(Request.Unwatch(), AllPrimaries);
318+
}
319+
320+
public async Task UnwatchAsync(Route route, CommandFlags flags = CommandFlags.None)
321+
{
322+
GuardClauses.ThrowIfCommandFlags(flags);
323+
_ = await Command(Request.Unwatch(), route);
324+
}
325+
308326
protected override async Task<Version> GetServerVersionAsync()
309327
{
310328
if (_serverVersion == null)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
using static Valkey.Glide.Internals.FFI;
4+
5+
namespace Valkey.Glide.Internals;
6+
7+
internal partial class Request
8+
{
9+
/// <summary>
10+
/// Creates a command to watch keys for conditional execution of a transaction.
11+
/// </summary>
12+
/// <param name="keys">The keys to watch.</param>
13+
/// <returns>A command that watches the specified keys.</returns>
14+
public static Cmd<string, string> Watch(ValkeyKey[] keys)
15+
{
16+
GlideString[] args = [.. keys.Select(k => (GlideString)k)];
17+
return new(RequestType.Watch, args, false, response => response);
18+
}
19+
20+
/// <summary>
21+
/// Creates a command to flush all previously watched keys for a transaction.
22+
/// </summary>
23+
/// <returns>A command that unwatches all previously watched keys.</returns>
24+
public static Cmd<string, string> Unwatch()
25+
=> new(RequestType.UnWatch, [], false, response => response);
26+
}

tests/Valkey.Glide.IntegrationTests/SharedBatchTests.cs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,121 @@ public async Task BatchDumpAndRestore(BaseClient client, bool isAtomic)
8686
);
8787

8888
}
89+
90+
[Theory(DisableDiscoveryEnumeration = true)]
91+
[MemberData(nameof(TestConfiguration.TestClients), MemberType = typeof(TestConfiguration))]
92+
public async Task WatchTransactionTest(BaseClient client)
93+
{
94+
string key1 = "{key}-1" + Guid.NewGuid();
95+
string key2 = "{key}-2" + Guid.NewGuid();
96+
string key3 = "{key}-3" + Guid.NewGuid();
97+
string foobarString = "foobar";
98+
string helloString = "hello";
99+
ValkeyKey[] keys = [key1, key2, key3];
100+
101+
bool isCluster = client is GlideClusterClient;
102+
103+
// Returns null when a watched key is modified before transaction execution
104+
if (isCluster)
105+
{
106+
await ((GlideClusterClient)client).WatchAsync(keys);
107+
}
108+
else
109+
{
110+
await ((GlideClient)client).WatchAsync(keys);
111+
}
112+
113+
await client.StringSetAsync(key2, helloString);
114+
115+
object?[]? execResult;
116+
if (isCluster)
117+
{
118+
var clusterBatch = new ClusterBatch(true);
119+
_ = clusterBatch.StringSetAsync(key1, foobarString)
120+
.StringSetAsync(key2, foobarString)
121+
.StringSetAsync(key3, foobarString);
122+
execResult = await ((GlideClusterClient)client).Exec(clusterBatch, true);
123+
}
124+
else
125+
{
126+
var batch = new Batch(true);
127+
_ = batch.StringSetAsync(key1, foobarString)
128+
.StringSetAsync(key2, foobarString)
129+
.StringSetAsync(key3, foobarString);
130+
execResult = await ((GlideClient)client).Exec(batch, true);
131+
}
132+
133+
// The transaction should fail (return null) because key2 was modified after being watched
134+
Assert.Null(execResult);
135+
136+
// Verify the key values: transaction was aborted, so only key2 (set before transaction) should have a value
137+
var key1Value = await client.StringGetAsync(key1);
138+
var key2Value = await client.StringGetAsync(key2);
139+
var key3Value = await client.StringGetAsync(key3);
140+
141+
Assert.True(key1Value.IsNull); // key1 should not be set
142+
Assert.Equal(helloString, key2Value); // key2 should have the value set before transaction
143+
Assert.True(key3Value.IsNull); // key3 should not be set
144+
}
145+
146+
[Theory(DisableDiscoveryEnumeration = true)]
147+
[MemberData(nameof(TestConfiguration.TestClients), MemberType = typeof(TestConfiguration))]
148+
public async Task UnwatchTest(BaseClient client)
149+
{
150+
string key1 = "{key}-1" + Guid.NewGuid();
151+
string key2 = "{key}-2" + Guid.NewGuid();
152+
string foobarString = "foobar";
153+
string helloString = "hello";
154+
ValkeyKey[] keys = [key1, key2];
155+
156+
bool isCluster = client is GlideClusterClient;
157+
158+
// UNWATCH succeeds when there are no watched keys
159+
if (isCluster)
160+
{
161+
await ((GlideClusterClient)client).UnwatchAsync();
162+
}
163+
else
164+
{
165+
await ((GlideClient)client).UnwatchAsync();
166+
}
167+
168+
// Transaction executes successfully after modifying a watched key then calling UNWATCH
169+
if (isCluster)
170+
{
171+
await ((GlideClusterClient)client).WatchAsync(keys);
172+
}
173+
else
174+
{
175+
await ((GlideClient)client).WatchAsync(keys);
176+
}
177+
await client.StringSetAsync(key2, helloString);
178+
if (isCluster)
179+
{
180+
await ((GlideClusterClient)client).UnwatchAsync();
181+
}
182+
else
183+
{
184+
await ((GlideClient)client).UnwatchAsync();
185+
}
186+
187+
object?[]? execResult;
188+
if (isCluster)
189+
{
190+
var clusterBatch = new ClusterBatch(true);
191+
_ = clusterBatch.StringSetAsync(key1, foobarString).StringSetAsync(key2, foobarString);
192+
execResult = await ((GlideClusterClient)client).Exec(clusterBatch, true);
193+
}
194+
else
195+
{
196+
var batch = new Batch(true);
197+
_ = batch.StringSetAsync(key1, foobarString).StringSetAsync(key2, foobarString);
198+
execResult = await ((GlideClient)client).Exec(batch, true);
199+
}
200+
201+
Assert.NotNull(execResult); // Transaction should succeed after unwatch
202+
Assert.Equal(2, execResult.Length);
203+
Assert.Equal(foobarString, await client.StringGetAsync(key1));
204+
Assert.Equal(foobarString, await client.StringGetAsync(key2));
205+
}
89206
}

0 commit comments

Comments
 (0)