Skip to content

Commit 41f6b00

Browse files
Simon Prickettleibale
andauthored
Add streams XREADGROUP and XACK example. (#1832)
* Removed stream delete command to allow consumer group example to work. * Adds stream consumer group example. * Adds stream consumer group example code. * Update README.md Co-authored-by: Leibale Eidelman <[email protected]>
1 parent d602682 commit 41f6b00

File tree

3 files changed

+105
-1
lines changed

3 files changed

+105
-1
lines changed

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ This folder contains example scripts showing how to use Node Redis in different
1919
| `stream-consumer.js` | Reads entries from a [Redis Stream](https://redis.io/topics/streams-intro) using the blocking `XREAD` command |
2020
| `time-series.js` | Create, populate and query timeseries data with [Redis Timeseries](https://redistimeseries.io) |
2121
| `topk.js` | Use the [RedisBloom](https://redisbloom.io) TopK to track the most frequently seen items. |
22+
| `stream-consumer-group.js` | Reads entties from a [Redis Stream](https://redis.io/topics/streams-intro) as part of a consumer group using the blocking `XREADGROUP` command |
2223

2324
## Contributing
2425

examples/stream-consumer-group.js

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// A sample stream consumer using the blocking variant of XREADGROUP.
2+
// This consumer works in collaboration with other instances of itself
3+
// in the same consumer group such that the group as a whole receives
4+
// every entry from the stream.
5+
//
6+
// This consumes entries from a stream created by stream-producer.js
7+
//
8+
// Run this as follows:
9+
//
10+
// $ node stream-consumer-group.js <consumerName>
11+
//
12+
// Run multiple instances with different values of <consumerName>
13+
// to see them processing the stream as a group:
14+
//
15+
// $ node stream-consumer-group.js consumer1
16+
//
17+
// In another terminal:
18+
//
19+
// $ node stream-consumer-group.js consumer2
20+
21+
import { createClient, commandOptions } from 'redis';
22+
23+
async function streamConsumerGroup() {
24+
const client = createClient();
25+
26+
if (process.argv.length !== 3) {
27+
console.log(`usage: node stream-consumer-group.js <consumerName>`);
28+
process.exit(1);
29+
}
30+
31+
const consumerName = process.argv[2];
32+
33+
await client.connect();
34+
35+
// Create the consumer group (and stream) if needed...
36+
try {
37+
await client.xGroupCreate('mystream', 'myconsumergroup', '0', {
38+
MKSTREAM: true
39+
});
40+
console.log('Created consumer group.');
41+
} catch (e) {
42+
console.log('Consumer group already exists, skipped creation.');
43+
}
44+
45+
console.log(`Starting consumer ${consumerName}.`);
46+
47+
while (true) {
48+
try {
49+
let response = await client.xReadGroup(
50+
commandOptions({
51+
isolated: true
52+
}),
53+
'myconsumergroup',
54+
consumerName, [
55+
// XREADGROUP can read from multiple streams, starting at a
56+
// different ID for each...
57+
{
58+
key: 'mystream',
59+
id: '>' // Next entry ID that no consumer in this group has read
60+
}
61+
], {
62+
// Read 1 entry at a time, block for 5 seconds if there are none.
63+
COUNT: 1,
64+
BLOCK: 5000
65+
}
66+
);
67+
68+
if (response) {
69+
// Response is an array of streams, each containing an array of
70+
// entries:
71+
//
72+
// [
73+
// {
74+
// "name": "mystream",
75+
// "messages": [
76+
// {
77+
// "id": "1642088708425-0",
78+
// "message": {
79+
// "num": "999"
80+
// }
81+
// }
82+
// ]
83+
// }
84+
// ]
85+
console.log(JSON.stringify(response));
86+
87+
// Use XACK to acknowledge successful processing of this
88+
// stream entry.
89+
const entryId = response[0].messages[0].id;
90+
await client.xAck('mystream', 'myconsumergroup', entryId);
91+
92+
console.log(`Acknowledged processing of entry ${entryId}.`);
93+
} else {
94+
// Response is null, we have read everything that is
95+
// in the stream right now...
96+
console.log('No new stream entries.');
97+
}
98+
} catch (err) {
99+
console.error(err);
100+
}
101+
}
102+
}
103+
104+
streamConsumerGroup();

examples/stream-producer.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ async function streamProducer() {
66
const client = createClient();
77

88
await client.connect();
9-
await client.del('mystream');
109

1110
let num = 0;
1211

0 commit comments

Comments
 (0)