Skip to content

Commit 309cdbd

Browse files
Simon Prickettleibale
andauthored
Add stream examples. (#1830)
* Adds stream consumer and producer scripts to doc. * Updated build command example. * Adds stream producer example. * Adds basic stream consumer example. * Added isolated execution. * Update README.md * Update stream-consumer.js Co-authored-by: Leibale Eidelman <[email protected]>
1 parent 841bd43 commit 309cdbd

File tree

3 files changed

+96
-1
lines changed

3 files changed

+96
-1
lines changed

examples/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ This folder contains example scripts showing how to use Node Redis in different
1212
| `search-hashes.js` | Uses [RediSearch](https://redisearch.io) to index and search data in hashes |
1313
| `search-json.js` | Uses [RediSearch](https://redisearch.io/) and [RedisJSON](https://redisjson.io/) to index and search JSON data |
1414
| `set-scan.js` | An example script that shows how to use the SSCAN iterator functionality |
15+
| `stream-producer.js` | Adds entries to a [Redis Stream](https://redis.io/topics/streams-intro) using the `XADD` command |
16+
| `stream-consumer.js` | Reads entries from a [Redis Stream](https://redis.io/topics/streams-intro) using the blocking `XREAD` command |
1517

1618
## Contributing
1719

@@ -24,7 +26,7 @@ To set up the examples folder so that you can run an example / develop one of yo
2426
```
2527
$ git clone https://github.com/redis/node-redis.git
2628
$ cd node-redis
27-
$ npm install -ws && npm run build
29+
$ npm install -ws && npm run build-all
2830
$ cd examples
2931
$ npm install
3032
```

examples/stream-consumer.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// A sample stream consumer using the blocking variant of XREAD.
2+
// This consumes entries from a stream created by stream-producer.js
3+
4+
import { createClient, commandOptions } from 'redis';
5+
6+
async function streamConsumer() {
7+
const client = createClient();
8+
9+
await client.connect();
10+
11+
let currentId = '0-0'; // Start at lowest possible stream ID
12+
13+
while (true) {
14+
try {
15+
let response = await client.xRead(
16+
commandOptions({
17+
isolated: true
18+
}), [
19+
// XREAD can read from multiple streams, starting at a
20+
// different ID for each...
21+
{
22+
key: 'mystream',
23+
id: currentId
24+
}
25+
], {
26+
// Read 1 entry at a time, block for 5 seconds if there are none.
27+
COUNT: 1,
28+
BLOCK: 5000
29+
}
30+
);
31+
32+
if (response) {
33+
// Response is an array of streams, each containing an array of
34+
// entries:
35+
// [
36+
// {
37+
// "name": "mystream",
38+
// "messages": [
39+
// {
40+
// "id": "1642088708425-0",
41+
// "message": {
42+
// "num": "999"
43+
// }
44+
// }
45+
// ]
46+
// }
47+
// ]
48+
console.log(JSON.stringify(response));
49+
50+
// Get the ID of the first (only) entry returned.
51+
currentId = response[0].messages[0].id;
52+
console.log(currentId);
53+
} else {
54+
// Response is null, we have read everything that is
55+
// in the stream right now...
56+
console.log('No new stream entries.');
57+
}
58+
}
59+
} catch (err) {
60+
console.error(err);
61+
}
62+
}
63+
64+
streamConsumer();
65+

examples/stream-producer.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// A sample stream producer using XADD.
2+
3+
import { createClient } from 'redis';
4+
5+
async function streamProducer() {
6+
const client = createClient();
7+
8+
await client.connect();
9+
await client.del('mystream');
10+
11+
let num = 0;
12+
13+
while (num < 1000) {
14+
// * = Let Redis generate a timestamp ID for this new entry.
15+
let id = await client.xAdd('mystream', '*', {
16+
num: `${num}`
17+
// Other name/value pairs can go here as required...
18+
});
19+
20+
console.log(`Added ${id} to the stream.`);
21+
num += 1;
22+
}
23+
24+
await client.quit();
25+
}
26+
27+
streamProducer();
28+

0 commit comments

Comments
 (0)