7900: sync: add closed method to broadcast::WeakSender#58
7900: sync: add closed method to broadcast::WeakSender#58martin-augment wants to merge 1 commit intomasterfrom
closed method to broadcast::WeakSender#58Conversation
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request introduces a unified async close-notification mechanism in the broadcast channel synchronization primitive. The changes add new ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code ReviewOverviewThis PR adds a ✅ Strengths1. Good Code Reuse
2. API Consistency
3. Comprehensive Test Coverage
4. Good Documentation
🔍 Minor Observations1. Documentation Formatting (line 1109) /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
This references 2. Test Organization 🎯 Final AssessmentVerdict: Approve ✅ This is a solid, well-implemented feature addition that:
The refactoring to extract `Shared::closed()) is particularly nice - it makes the code more maintainable and follows the DRY principle. Recommendation: Merge after addressing the minor documentation wording if desired (or leave as-is for consistency with 🤖 Review generated by Claude Code |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tokio/src/sync/broadcast.rs`:
- Around line 1045-1059: The current async fn closed(&self) in the broadcast
implementation incorrectly returns based on tail.closed (which is set both when
all senders drop and when all receivers drop); change the check to inspect the
receiver count instead so WeakSender::closed() only completes when no receivers
remain: inside closed() acquire the tail lock (the same tail = self.tail.lock()
scope), read tail.rx_cnt (or the equivalent receiver count field) and return
only when rx_cnt == 0; keep the notify_last_rx_drop.notified()/await logic and
ensure the lock is dropped before awaiting, and leave other code paths like
close_channel() and Receiver::drop untouched.
🧹 Nitpick comments (1)
tokio/src/sync/broadcast.rs (1)
1109-1137: Doc and delegation look good, but note thecooperative()wrapping is missing.The
watch::Sender::closed()wraps its loop incooperative(…).awaitand callscrate::trace::async_trace_leaf().awaitfor budget/tracing support. This method (andSender::closedabove) omit both. While unlikely to cause real issues (the loop shouldn't spin rapidly), it's an inconsistency with the rest of the crate.
| async fn closed(&self) { | ||
| loop { | ||
| let notified = self.notify_last_rx_drop.notified(); | ||
|
|
||
| { | ||
| // Ensure the lock drops if the channel isn't closed | ||
| let tail = self.tail.lock(); | ||
| if tail.closed { | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| notified.await; | ||
| } | ||
| } |
There was a problem hiding this comment.
Bug: tail.closed conflates "all senders dropped" with "all receivers dropped" — breaks WeakSender::closed().
tail.closed is set to true by both close_channel() (when all strong Senders drop) and Receiver::drop (when rx_cnt reaches 0). This means Shared::closed() will return immediately whenever all strong senders have been dropped, even if receivers are still alive.
This is correct for Sender::closed() (you always hold a strong ref, so close_channel can't fire), but incorrect for WeakSender::closed() — the whole point is that the strong senders may already be gone. The test broadcast_weak_sender_closed drops tx first (Line 681), which sets tail.closed = true via close_channel, then expects weak.closed() to remain pending while receivers exist. It will instead resolve immediately.
The watch::Sender::closed() correctly checks receiver_count() > 0 rather than a "closed" flag. The same approach should be used here:
🐛 Proposed fix: check rx_cnt instead of tail.closed
async fn closed(&self) {
loop {
let notified = self.notify_last_rx_drop.notified();
{
- // Ensure the lock drops if the channel isn't closed
let tail = self.tail.lock();
- if tail.closed {
+ if tail.rx_cnt == 0 {
return;
}
}
notified.await;
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn closed(&self) { | |
| loop { | |
| let notified = self.notify_last_rx_drop.notified(); | |
| { | |
| // Ensure the lock drops if the channel isn't closed | |
| let tail = self.tail.lock(); | |
| if tail.closed { | |
| return; | |
| } | |
| } | |
| notified.await; | |
| } | |
| } | |
| async fn closed(&self) { | |
| loop { | |
| let notified = self.notify_last_rx_drop.notified(); | |
| { | |
| let tail = self.tail.lock(); | |
| if tail.rx_cnt == 0 { | |
| return; | |
| } | |
| } | |
| notified.await; | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@tokio/src/sync/broadcast.rs` around lines 1045 - 1059, The current async fn
closed(&self) in the broadcast implementation incorrectly returns based on
tail.closed (which is set both when all senders drop and when all receivers
drop); change the check to inspect the receiver count instead so
WeakSender::closed() only completes when no receivers remain: inside closed()
acquire the tail lock (the same tail = self.tail.lock() scope), read tail.rx_cnt
(or the equivalent receiver count field) and return only when rx_cnt == 0; keep
the notify_last_rx_drop.notified()/await logic and ensure the lock is dropped
before awaiting, and leave other code paths like close_channel() and
Receiver::drop untouched.
There was a problem hiding this comment.
Code Review
This pull request introduces a closed() method on WeakSender and refactors the existing Sender::closed() to share implementation. However, a critical vulnerability has been identified: the Shared::closed implementation incorrectly relies on the tail.closed flag, causing WeakSender::closed() to complete prematurely when senders are dropped, even if receivers are still active. This violates documented behavior and could lead to race conditions. A specific fix has been suggested, and it's recommended to add a test case to cover the scenario where senders are dropped but receivers still exist to prevent regressions.
| if tail.closed { | ||
| return; |
There was a problem hiding this comment.
The closed() method in Shared::closed completes prematurely if all Sender handles are dropped, even when Receiver handles are still active. This is a critical vulnerability because tail.closed is incorrectly used to indicate both sender and receiver drops, violating the documented behavior of WeakSender::closed() and potentially causing race conditions. The closed() future should only complete when all Receivers are dropped. The correct check should be against tail.rx_cnt == 0.
| if tail.closed { | |
| return; | |
| if tail.rx_cnt == 0 { | |
| return; | |
| } |
🤖 Augment PR SummarySummary: Adds a Changes:
🤖 Was this summary useful? React with 👍 or 👎 |
| { | ||
| // Ensure the lock drops if the channel isn't closed | ||
| let tail = self.tail.lock(); | ||
| if tail.closed { |
There was a problem hiding this comment.
Shared::closed checks tail.closed, which is also set when the last Sender is dropped (close_channel), so WeakSender::closed() can complete even while rx_cnt > 0. This seems to conflict with the docs/tests describing completion when the receiver count reaches zero; can you confirm the intended semantics?
Severity: high
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| } | ||
| } | ||
|
|
||
| /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches |
There was a problem hiding this comment.
|
Superseded by #60 |
7900: To review by AI