Skip to content

Commit 8410f48

Browse files
committed
test: keep pubsub commands dedicated to the same connection in single client
1 parent fa1f7d9 commit 8410f48

File tree

1 file changed

+80
-3
lines changed

1 file changed

+80
-3
lines changed

redis_test.go

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,83 @@ func testPubSub(t *testing.T, client Client) {
628628
}
629629
}
630630

631+
func testPubSubSharded(t *testing.T, client Client) {
632+
msgs := 5000
633+
mmap := make(map[string]struct{})
634+
for i := 0; i < msgs; i++ {
635+
mmap[strconv.Itoa(i)] = struct{}{}
636+
}
637+
t.Logf("testing pubsub with %v messages\n", msgs)
638+
jobs, wait := parallel(10)
639+
640+
ctx := context.Background()
641+
642+
messages := make(chan string, 10)
643+
644+
wg := sync.WaitGroup{}
645+
wg.Add(1)
646+
go func() {
647+
err := client.Receive(ctx, client.B().Ssubscribe().Channel("ch1").Build(), func(msg PubSubMessage) {
648+
messages <- msg.Message
649+
})
650+
if err != nil {
651+
t.Errorf("unexpected subscribe response %v", err)
652+
}
653+
wg.Done()
654+
}()
655+
656+
go func() {
657+
time.Sleep(time.Second)
658+
for i := 0; i < msgs; i++ {
659+
msg := strconv.Itoa(i)
660+
ch := "ch1"
661+
jobs <- func() {
662+
if err := client.Do(context.Background(), client.B().Spublish().Channel(ch).Message(msg).Build()).Error(); err != nil {
663+
t.Errorf("unexpected publish response %v", err)
664+
}
665+
}
666+
}
667+
wait()
668+
}()
669+
670+
for message := range messages {
671+
delete(mmap, message)
672+
if len(mmap) == 0 {
673+
close(messages)
674+
}
675+
}
676+
677+
for _, resp := range client.DoMulti(context.Background(),
678+
client.B().Sunsubscribe().Channel("ch1").Build()) {
679+
if err := resp.Error(); err != nil {
680+
t.Fatal(err)
681+
}
682+
}
683+
wg.Wait()
684+
685+
t.Logf("testing pubsub hooks with 500 messages\n")
686+
687+
for i := 0; i < 500; i++ {
688+
cc, cancel := client.Dedicate()
689+
msg := strconv.Itoa(i)
690+
ch := cc.SetPubSubHooks(PubSubHooks{
691+
OnMessage: func(m PubSubMessage) {
692+
cc.SetPubSubHooks(PubSubHooks{})
693+
},
694+
})
695+
if err := cc.Do(context.Background(), client.B().Ssubscribe().Channel("ch2").Build()).Error(); err != nil {
696+
t.Fatal(err)
697+
}
698+
if err := client.Do(context.Background(), client.B().Spublish().Channel("ch2").Message(msg).Build()).Error(); err != nil {
699+
t.Fatal(err)
700+
}
701+
if err := <-ch; err != nil {
702+
t.Fatal(err)
703+
}
704+
cancel()
705+
}
706+
}
707+
631708
func testLua(t *testing.T, client Client) {
632709
script := NewLuaScript("return {KEYS[1],ARGV[1]}")
633710

@@ -679,7 +756,7 @@ func TestSingleClientIntegration(t *testing.T) {
679756
t.Fatal(err)
680757
}
681758

682-
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testLua)
759+
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)
683760
run(t, client, testFlush)
684761

685762
client.Close()
@@ -702,7 +779,7 @@ func TestSentinelClientIntegration(t *testing.T) {
702779
t.Fatal(err)
703780
}
704781

705-
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testLua)
782+
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)
706783
run(t, client, testFlush)
707784

708785
client.Close()
@@ -722,7 +799,7 @@ func TestClusterClientIntegration(t *testing.T) {
722799
if err != nil {
723800
t.Fatal(err)
724801
}
725-
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testLua)
802+
run(t, client, testSETGETCSC, testMultiSETGETCSC, testMultiSETGETCSCHelpers, testMultiExec, testBlockingZPOP, testBlockingXREAD, testPubSub, testPubSubSharded, testLua)
726803

727804
client.Close()
728805
}

0 commit comments

Comments
 (0)