From 280f11624721946bd5918c8a0feb5d06f01b1791 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Tue, 18 Aug 2020 18:11:12 +0200 Subject: [PATCH] Added extra TestConcurrentSubscribeMultipleTopics test (#196) * added extra TestConcurrentSubscribeMultipleTopics test + some old tests tweaks * fix race --- pubsub/gochannel/pubsub.go | 9 ++-- pubsub/tests/test_pubsub.go | 102 ++++++++++++++++++++++++++++++------ 2 files changed, 92 insertions(+), 19 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index a79b5ce25..b860b1572 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -85,8 +85,9 @@ func (g *GoChannel) Publish(topic string, messages ...*message.Message) error { return errors.New("Pub/Sub closed") } + messagesToPublish := make(message.Messages, len(messages)) for i, msg := range messages { - messages[i] = msg.Copy() + messagesToPublish[i] = msg.Copy() } g.subscribersLock.RLock() @@ -101,12 +102,12 @@ func (g *GoChannel) Publish(topic string, messages ...*message.Message) error { if _, ok := g.persistedMessages[topic]; !ok { g.persistedMessages[topic] = make([]*message.Message, 0) } - g.persistedMessages[topic] = append(g.persistedMessages[topic], messages...) + g.persistedMessages[topic] = append(g.persistedMessages[topic], messagesToPublish...) g.persistedMessagesLock.Unlock() } - for i := range messages { - msg := messages[i] + for i := range messagesToPublish { + msg := messagesToPublish[i] ackedBySubscribers, err := g.sendMessage(topic, msg) if err != nil { diff --git a/pubsub/tests/test_pubsub.go b/pubsub/tests/test_pubsub.go index 6afad75b9..acb2575b4 100644 --- a/pubsub/tests/test_pubsub.go +++ b/pubsub/tests/test_pubsub.go @@ -47,6 +47,7 @@ func TestPubSub( }{ {Func: TestPublishSubscribe}, {Func: TestConcurrentSubscribe}, + {Func: TestConcurrentSubscribeMultipleTopics}, {Func: TestResendOnError}, {Func: TestNoAck}, {Func: TestContinueAfterSubscribeClose}, @@ -121,7 +122,7 @@ type Features struct { RequireSingleInstance bool // NewSubscriberReceivesOldMessages should be set to true if messages are persisted even - // if they are already consumed (for example, like in Kafka). + // if they are already consumed (for example, like in Kafka). NewSubscriberReceivesOldMessages bool } @@ -269,16 +270,7 @@ func TestConcurrentSubscribe( require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } - var messagesToPublish []*message.Message - - for i := 0; i < messagesCount; i++ { - id := watermill.NewUUID() - - msg := message.NewMessage(id, nil) - messagesToPublish = append(messagesToPublish, msg) - } - err := publishWithRetry(pub, topicName, messagesToPublish...) - require.NoError(t, err, "cannot publish message") + publishedMessages := AddSimpleMessagesParallel(t, messagesCount, pub, topicName, 50) var sub message.Subscriber if tCtx.Features.RequireSingleInstance { @@ -292,10 +284,80 @@ func TestConcurrentSubscribe( messages, err := sub.Subscribe(context.Background(), topicName) require.NoError(t, err) - receivedMessages, all := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout*3) + receivedMessages, all := bulkRead(tCtx, messages, len(publishedMessages), defaultTimeout*3) assert.True(t, all) - AssertAllMessagesReceived(t, messagesToPublish, receivedMessages) + AssertAllMessagesReceived(t, publishedMessages, receivedMessages) +} + +func TestConcurrentSubscribeMultipleTopics( + t *testing.T, + tCtx TestContext, + pubSubConstructor PubSubConstructor, +) { + pub, sub := pubSubConstructor(t) + defer closePubSub(t, pub, sub) + + messagesCount := 100 + topicsCount := 50 + + if testing.Short() { + messagesCount = 50 + topicsCount = 10 + } + + var messagesToPublish []*message.Message + for i := 0; i < messagesCount; i++ { + id := watermill.NewUUID() + + msg := message.NewMessage(id, nil) + messagesToPublish = append(messagesToPublish, msg) + } + + subsWg := sync.WaitGroup{} + subsWg.Add(topicsCount) + + receivedMessagesCh := make(chan message.Messages, topicsCount) + + for i := 0; i < topicsCount; i++ { + topicName := testTopicName(tCtx.TestID) + fmt.Sprintf("_%d", i) + + go func() { + defer subsWg.Done() + + if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { + err := subscribeInitializer.SubscribeInitialize(topicName) + if err != nil { + t.Fatal(err) + } + } + + err := publishWithRetry(pub, topicName, messagesToPublish...) + if err != nil { + t.Fatal(err) + } + + messages, err := sub.Subscribe(context.Background(), topicName) + if err != nil { + t.Fatal(err) + } + topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout) + + receivedMessagesCh <- topicMessages + }() + } + + subsWg.Wait() + close(receivedMessagesCh) + + topicsReceivedMessages := 0 + + for msgs := range receivedMessagesCh { + AssertAllMessagesReceived(t, messagesToPublish, msgs) + topicsReceivedMessages++ + } + + assert.Equal(t, topicsCount, topicsReceivedMessages) } func TestPublishSubscribeInOrder( @@ -521,16 +583,26 @@ func TestContinueAfterSubscribeClose( require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } - publishedMessages := PublishSimpleMessages(t, totalMessagesCount, pub, topicName) + publishedMessages := AddSimpleMessagesParallel(t, totalMessagesCount, pub, topicName, 50) receivedMessages := map[string]*message.Message{} for i := 0; i < readAttempts; i++ { + pub, sub := createPubSub(t) messages, err := sub.Subscribe(context.Background(), topicName) require.NoError(t, err) - receivedMessagesBatch, _ := bulkRead(tCtx, messages, batchSize, defaultTimeout) + messagesToRead := batchSize + messagesLeft := totalMessagesCount - len(receivedMessages) + + if messagesToRead > messagesLeft { + messagesToRead = messagesLeft + } + + receivedMessagesBatch, _ := bulkRead(tCtx, messages, messagesToRead, defaultTimeout) + closePubSub(t, pub, sub) + for _, msg := range receivedMessagesBatch { receivedMessages[msg.UUID] = msg }