fix(pubsub): ensure listener processes all messages#36
Conversation
ren0503
commented
Jan 27, 2026
- Fix issue where listener would only process the first message.
- Add unit test to verify 100 messages are received.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughDependency bumped; listener refactored from single receive to range-loop to process all messages; new integration test publishes 100 messages and verifies receipt; CI workflow removed conditional gating so tests and coverage upload run unconditionally. Changes
Sequence Diagram(s)sequenceDiagram
participant Pub as Publisher
participant Broker as Broker / Channel
participant L as Listener
participant H as Handler/Factory
rect rgba(0, 100, 200, 0.5)
Note over L: Old pattern (single receive)
Pub->>Broker: send message
L->>Broker: receive (single)
Broker-->>L: msg, ok
alt ok == false
L-->>L: exit
end
L->>H: factory(msg)
H-->>L: processed
end
rect rgba(100, 200, 0, 0.5)
Note over L: New pattern (continuous range)
Pub->>Broker: send message 1
Pub->>Broker: send message 2
Pub->>Broker: send message 3
L->>Broker: range over messages
loop for each message
Broker-->>L: msg
L->>H: factory(msg)
H-->>L: processed
end
Pub->>Broker: close channel
Broker-->>L: channel closed
L-->>L: exit range
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@listener_test.go`:
- Around line 90-96: The timeout branch in the select (waiting on the done
channel and time.After(1 * time.Second)) only prints a message, so change it to
fail the test by calling the testing.T failure method (e.g., replace
fmt.Println("Timeout waiting for messages") with t.Fatalf("timeout waiting for
messages after %s", time.Second) or t.Fatal("timeout waiting for messages")) so
a missed SLA causes the test to fail; keep the select using the same done
channel and time.After call.
- Around line 3-13: The test’s receivedCount is updated in the listener
goroutine and read in the main test goroutine, causing a race; change
receivedCount to an int32/int64 and use sync/atomic.AddInt32/Int64 in the
listener and sync/atomic.LoadInt32/LoadInt64 for the final assertion
(references: receivedCount, the listener goroutine that increments it, and the
test assertion that reads it). Also, replace the current timeout branch that
only prints with a test failure (t.Fatalf or require.Fail) so the timeout path
fails the test explicitly. Ensure imports include sync/atomic and adjust
assertions to compare the atomic-loaded value.
- Fix issue where listener would only process the first message. - Add unit test to verify 100 messages are received.
8156f8c to
8476666
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |