Skip to content

Commit 97894e5

Browse files
Adds support for XPENDING IDLE parameter
fixes StackExchange#2432
1 parent 7170cb4 commit 97894e5

File tree

9 files changed

+78
-23
lines changed

9 files changed

+78
-23
lines changed

src/StackExchange.Redis/Interfaces/IDatabase.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2658,11 +2658,12 @@ IEnumerable<SortedSetEntry> SortedSetScan(
26582658
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
26592659
/// <param name="minId">The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream.</param>
26602660
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream.</param>
2661+
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
26612662
/// <param name="flags">The flags to use for this operation.</param>
26622663
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
26632664
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
26642665
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
2665-
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
2666+
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);
26662667

26672668
/// <summary>
26682669
/// Read a stream using the given range of IDs.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -645,8 +645,8 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
645645
/// <inheritdoc cref="IDatabase.StreamPending(RedisKey, RedisValue, CommandFlags)"/>
646646
Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
647647

648-
/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, CommandFlags)"/>
649-
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
648+
/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, long?, CommandFlags)"/>
649+
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);
650650

651651
/// <inheritdoc cref="IDatabase.StreamRange(RedisKey, RedisValue?, RedisValue?, int?, Order, CommandFlags)"/>
652652
Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -615,8 +615,8 @@ public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN
615615
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
616616
Inner.StreamPendingAsync(ToInner(key), groupName, flags);
617617

618-
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
619-
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
618+
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
619+
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);
620620

621621
public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
622622
Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -597,8 +597,8 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman
597597
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
598598
Inner.StreamPending(ToInner(key), groupName, flags);
599599

600-
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
601-
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
600+
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
601+
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);
602602

603603
public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
604604
Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key,
734734
StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo
735735
StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
736736
StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo
737-
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
737+
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
738738
StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
739739
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
740740
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
@@ -971,7 +971,7 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi
971971
StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamInfo>!
972972
StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
973973
StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingInfo>!
974-
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
974+
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
975975
StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
976976
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
977977
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!

src/StackExchange.Redis/RedisDatabase.cs

+29-10
Original file line numberDiff line numberDiff line change
@@ -2803,7 +2803,7 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group
28032803
return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo);
28042804
}
28052805

2806-
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
2806+
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
28072807
{
28082808
var msg = GetStreamPendingMessagesMessage(
28092809
key,
@@ -2812,12 +2812,13 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
28122812
maxId,
28132813
count,
28142814
consumerName,
2815+
minIdleTimeInMs,
28152816
flags);
28162817

28172818
return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
28182819
}
28192820

2820-
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
2821+
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
28212822
{
28222823
var msg = GetStreamPendingMessagesMessage(
28232824
key,
@@ -2826,6 +2827,7 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
28262827
maxId,
28272828
count,
28282829
consumerName,
2830+
minIdleTimeInMs,
28292831
flags);
28302832

28312833
return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
@@ -4300,9 +4302,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro
43004302
/// Gets a message for <see href="https://redis.io/commands/xpending/"/>.
43014303
/// </summary>
43024304
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
4303-
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags)
4305+
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags)
43044306
{
4305-
// > XPENDING mystream mygroup - + 10 [consumer name]
4307+
// > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name]
43064308
// 1) 1) 1526569498055 - 0
43074309
// 2) "Bob"
43084310
// 3) (integer)74170458
@@ -4316,16 +4318,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa
43164318
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
43174319
}
43184320

4319-
var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5];
4321+
var valuesLength = 4;
4322+
if (consumerName != RedisValue.Null)
4323+
{
4324+
valuesLength++;
4325+
}
43204326

4321-
values[0] = groupName;
4322-
values[1] = minId ?? StreamConstants.ReadMinValue;
4323-
values[2] = maxId ?? StreamConstants.ReadMaxValue;
4324-
values[3] = count;
4327+
if (minIdleTimeInMs is not null)
4328+
{
4329+
valuesLength += 2;
4330+
}
4331+
var values = new RedisValue[valuesLength];
4332+
4333+
var offset = 0;
4334+
4335+
values[offset++] = groupName;
4336+
if (minIdleTimeInMs is not null)
4337+
{
4338+
values[offset++] = "IDLE";
4339+
values[offset++] = minIdleTimeInMs;
4340+
}
4341+
values[offset++] = minId ?? StreamConstants.ReadMinValue;
4342+
values[offset++] = maxId ?? StreamConstants.ReadMaxValue;
4343+
values[offset++] = count;
43254344

43264345
if (consumerName != RedisValue.Null)
43274346
{
4328-
values[4] = consumerName;
4347+
values[offset++] = consumerName;
43294348
}
43304349

43314350
return Message.Create(

tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1154,8 +1154,8 @@ public void StreamPendingInfoGet()
11541154
[Fact]
11551155
public void StreamPendingMessageInfoGet()
11561156
{
1157-
prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
1158-
mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
1157+
prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
1158+
mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
11591159
}
11601160

11611161
[Fact]

tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1070,8 +1070,8 @@ public async Task StreamPendingInfoGetAsync()
10701070
[Fact]
10711071
public async Task StreamPendingMessageInfoGetAsync()
10721072
{
1073-
await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
1074-
await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None);
1073+
await prefixed.StreamPendingMessagesAsync("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
1074+
await mock.Received().StreamPendingMessagesAsync("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None);
10751075
}
10761076

10771077
[Fact]

tests/StackExchange.Redis.Tests/StreamTests.cs

+35
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,41 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
10981098
Assert.Equal(id1, pendingMessageInfoList[0].MessageId);
10991099
}
11001100

1101+
[Fact]
1102+
public async Task StreamConsumerGroupViewPendingMessageWithMinIdle()
1103+
{
1104+
await using var conn = Create(require: RedisFeatures.v6_2_0);
1105+
1106+
var db = conn.GetDatabase();
1107+
var key = Me();
1108+
const string groupName = "test_group",
1109+
consumer1 = "test_consumer_1";
1110+
const int minIdleTimeInMs = 100;
1111+
1112+
var id1 = db.StreamAdd(key, "field1", "value1");
1113+
1114+
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
1115+
1116+
// Read a single message into the first consumer.
1117+
db.StreamReadGroup(key, groupName, consumer1, count: 1);
1118+
1119+
var preDelayPendingMessages =
1120+
db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs);
1121+
1122+
await Task.Delay(minIdleTimeInMs).ForAwait();
1123+
1124+
var postDelayPendingMessages =
1125+
db.StreamPendingMessages(key, groupName, 10, RedisValue.Null, minId: id1, maxId: id1, minIdleTimeInMs: minIdleTimeInMs);
1126+
1127+
Assert.NotNull(preDelayPendingMessages);
1128+
Assert.Empty(preDelayPendingMessages);
1129+
Assert.NotNull(postDelayPendingMessages);
1130+
Assert.Single(postDelayPendingMessages);
1131+
Assert.Equal(1, postDelayPendingMessages[0].DeliveryCount);
1132+
Assert.True((int)postDelayPendingMessages[0].IdleTimeInMilliseconds > minIdleTimeInMs);
1133+
Assert.Equal(id1, postDelayPendingMessages[0].MessageId);
1134+
}
1135+
11011136
[Fact]
11021137
public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
11031138
{

0 commit comments

Comments
 (0)