Skip to content

Commit f243ef1

Browse files
committed
feat: Improve error handling and logging in Redis worker
- Downgrade specific Redis errors to info level logging - Handle 'BUSYGROUP Consumer Group name already exists' as informational message
1 parent ab58982 commit f243ef1

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

redis.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package redisdb
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
7+
"fmt"
68
"strings"
79
"sync"
810
"sync/atomic"
@@ -83,7 +85,11 @@ func (w *Worker) startConsumer() {
8385
w.opts.group,
8486
"$",
8587
).Err(); err != nil {
86-
w.opts.logger.Error(err)
88+
if err.Error() == "BUSYGROUP Consumer Group name already exists" {
89+
w.opts.logger.Info(err)
90+
} else {
91+
w.opts.logger.Error(err)
92+
}
8793
}
8894

8995
go w.fetchTask()
@@ -110,7 +116,14 @@ func (w *Worker) fetchTask() {
110116
Block: w.opts.blockTime,
111117
}).Result()
112118
if err != nil {
113-
w.opts.logger.Errorf("error while reading from redis %v", err)
119+
workerInfo := fmt.Sprintf("{streamName: %q, group: %q, consumer: %q}",
120+
w.opts.streamName, w.opts.group, w.opts.consumer)
121+
if errors.Is(err, redis.Nil) {
122+
w.opts.logger.Infof("no data while reading from redis stream %s", workerInfo)
123+
} else {
124+
w.opts.logger.Errorf("error while reading from redis %s %v", workerInfo, err)
125+
}
126+
114127
continue
115128
}
116129
// we have received the data we should loop it and queue the messages

0 commit comments

Comments
 (0)