1
1
const Redis = require ( "ioredis" ) ;
2
2
const redis = new Redis ( ) ;
3
+ const sub = new Redis ( ) ;
3
4
const pub = new Redis ( ) ;
4
5
6
+ // Usage 1: As message hub
5
7
const processMessage = ( message ) => {
6
8
console . log ( "Id: %s. Data: %O" , message [ 0 ] , message [ 1 ] ) ;
7
9
} ;
@@ -10,13 +12,7 @@ async function listenForMessage(lastId = "$") {
10
12
// `results` is an array, each element of which corresponds to a key.
11
13
// Because we only listen to one key (mystream) here, `results` only contains
12
14
// a single element. See more: https://redis.io/commands/xread#return-value
13
- const results = await redis . xread (
14
- "BLOCK" ,
15
- 0 ,
16
- "STREAMS" ,
17
- "user-stream" ,
18
- lastId
19
- ) ;
15
+ const results = await sub . xread ( "BLOCK" , 0 , "STREAMS" , "user-stream" , lastId ) ;
20
16
const [ key , messages ] = results [ 0 ] ; // `key` equals to "user-stream"
21
17
22
18
messages . forEach ( processMessage ) ;
@@ -32,3 +28,22 @@ setInterval(() => {
32
28
// so we use another connection to publish messages.
33
29
pub . xadd ( "user-stream" , "*" , "name" , "John" , "age" , "20" ) ;
34
30
} , 1000 ) ;
31
+
32
+ // Usage 2: As a list
33
+ async function main ( ) {
34
+ redis
35
+ . pipeline ( )
36
+ . xadd ( "list-stream" , "*" , "id" , "item1" )
37
+ . xadd ( "list-stream" , "*" , "id" , "item2" )
38
+ . xadd ( "list-stream" , "*" , "id" , "item3" )
39
+ . exec ( ) ;
40
+
41
+ const items = await redis . xrange ( "list-stream" , "-" , "+" , "COUNT" , 2 ) ;
42
+ console . log ( items ) ;
43
+ // [
44
+ // [ '1647321710097-0', [ 'id', 'item1' ] ],
45
+ // [ '1647321710098-0', [ 'id', 'item2' ] ]
46
+ // ]
47
+ }
48
+
49
+ main ( ) ;
0 commit comments