Skip to content

7900: sync: add closed method to broadcast::WeakSender#60

Open
martin-augment wants to merge 2 commits intomasterfrom
pr-7900-2026-02-11-10-09-23
Open

7900: sync: add closed method to broadcast::WeakSender#60
martin-augment wants to merge 2 commits intomasterfrom
pr-7900-2026-02-11-10-09-23

Conversation

@martin-augment
Copy link
Owner

7900: To review by AI

@gemini-code-assist
Copy link

Summary of Changes

Hello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the tokio::sync::broadcast channel by refining its closure state management. It introduces separate flags to distinguish between a channel being closed due to a lack of receivers versus a lack of senders. This improvement facilitates the addition of a closed method to WeakSender, enabling asynchronous waiting for all Receivers to be dropped, thereby improving the channel's robustness and API consistency.

Highlights

  • Refactored Channel Closure State: The internal Tail struct in the broadcast channel was refactored to use two distinct boolean flags, closed_for_senders and closed_for_receivers, replacing the single closed flag. This change provides more granular control and clarity over the channel's closure state based on whether senders or receivers are absent.
  • Added closed method to broadcast::WeakSender: A new asynchronous closed method was added to broadcast::WeakSender. This method allows users to await the point at which all Receivers subscribed to the channel have been dropped, aligning its functionality with the existing Sender::closed method.
  • Internal Logic Updates: The internal logic for managing the channel's closed state, including initialization, Sender::closed implementation, close_channel, new_receiver, Receiver::poll, and Drop for Receiver, was updated to correctly utilize the new closed_for_senders and closed_for_receivers flags.
Changelog
  • tokio/src/sync/broadcast.rs
    • Renamed the closed field in the Tail struct to closed_for_senders and introduced a new closed_for_receivers field.
    • Updated the initialization of the closed_for_senders and closed_for_receivers fields in Sender::new.
    • Modified Sender::closed to delegate its logic to a newly extracted shared.closed_for_senders() method.
    • Changed Sender::close_channel to set tail.closed_for_receivers when closing the channel.
    • Updated new_receiver to reset tail.closed_for_senders when a new receiver is added.
    • Extracted the asynchronous waiting logic for closed_for_senders into a new async fn closed_for_senders() within the Shared struct.
    • Implemented the pub async fn closed() method for WeakSender, which internally calls shared.closed_for_senders().
    • Adjusted Receiver::poll to check tail.closed_for_receivers when determining if the channel is closed for receivers.
    • Modified the Drop for Receiver implementation to set tail.closed_for_senders when the last receiver is dropped.
  • tokio/tests/sync_broadcast.rs
    • Added a new test case named broadcast_weak_sender_closed to verify the functionality of the WeakSender::closed method.
    • Extended the existing broadcast_sender_closed_with_extra_subscribe test to include assertions for the WeakSender::closed method's behavior alongside Sender::closed.
Activity
  • The pull request is awaiting review.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

Walkthrough

The broadcast channel implementation is refactored to replace a single closure flag with two distinct flags: closed_for_senders (indicating zero receivers remain) and closed_for_receivers (indicating all senders are dropped). The receiver and sender drop paths are updated to appropriately set these flags during closure. A new public async method WeakSender::closed() is added to await closure on weak sender references. An internal async waiter method Shared::closed_for_senders() is introduced. Existing public API signatures remain unchanged. Tests are added to verify closure behavior with weak senders in various subscription scenarios.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-7900-2026-02-11-10-09-23

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Feb 11, 2026

Code Review

Thank you for this PR! This adds the closed method to WeakSender and refactors the internal channel state management. Here's my detailed review:

Positive Aspects

Better semantic clarity: Splitting closed into closed_for_senders and closed_for_receivers makes the intent clearer and properly separates the two different closure semantics.

Code reuse: Extracting closed_for_senders() as a shared method is excellent refactoring - it eliminates duplication and ensures consistent behavior between Sender::closed() and WeakSender::closed().

Comprehensive test coverage: The updated tests thoroughly validate both Sender::closed() and WeakSender::closed() in parallel, ensuring they behave identically.

Cooperative scheduling: The implementation correctly adds cooperative task yielding via cooperative() wrapper, which is important for fairness in the Tokio runtime.

Tracing support: The async_trace_leaf() call enables proper async runtime tracing.


Issues Found

🐛 Critical: Documentation Example Bug

// In WeakSender::closed() documentation (lines 1137, 1141)
assert\!(tx.closed().now_or_never().is_none());  // ❌ Wrong
assert\!(tx.closed().now_or_never().is_some());  // ❌ Wrong

Problem: The example demonstrates WeakSender::closed() but calls tx.closed() instead of weak.closed(). This will confuse users about the actual API being demonstrated.

Fix: Change to:

assert\!(weak.closed().now_or_never().is_none());
assert\!(weak.closed().now_or_never().is_some());

⚠️ Ordering Issue: async_trace_leaf and cooperative

Looking at tokio/src/sync/watch.rs:1314-1329, the established pattern in Tokio is:

cooperative(async {
    crate::trace::async_trace_leaf().await;
    // ... rest of logic
})
.await

However, the new closed_for_senders() method does:

crate::trace::async_trace_leaf().await;
cooperative(async {
    // ... logic
})
.await

Impact: The trace leaf is called outside the cooperative wrapper, which means the tracing won't be properly associated with the cooperative context.

Recommendation: Move async_trace_leaf() inside the cooperative block to match the pattern used in watch.rs and other sync primitives.


Minor Observations

📝 Naming consistency: The closed_for_senders / closed_for_receivers naming is descriptive and clear.

📝 No breaking changes: All public APIs remain compatible, with only internal state representation changing.


Summary

Category Status
Code Quality ✅ Good - clean refactoring
Best Practices ⚠️ Minor issue with trace ordering
Potential Bugs 🐛 Documentation example bug
Performance ✅ Good - adds cooperative scheduling
Security ✅ No concerns
Test Coverage ✅ Excellent

Recommended Actions

  1. Fix the documentation example to use weak.closed() instead of tx.closed()
  2. Consider moving async_trace_leaf() inside the cooperative block to match the pattern in watch.rs
  3. After fixes, the PR should be good to merge

Great work on this feature! The semantic clarity improvement is valuable, and the implementation is solid overall.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds a closed method to broadcast::WeakSender, which completes when all receivers have been dropped. To support this, the internal closed flag in the Tail struct has been split into closed_for_senders and closed_for_receivers for better clarity on the channel's state. The implementation looks solid and is well-tested. I have one minor suggestion to improve the documentation example.

Comment on lines +1137 to +1141
/// assert!(tx.closed().now_or_never().is_none());
///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(tx.closed().now_or_never().is_some());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The documentation example for WeakSender::closed incorrectly uses tx.closed() instead of weak.closed(). To accurately demonstrate the new method, the example should call closed() on the WeakSender instance.

Suggested change
/// assert!(tx.closed().now_or_never().is_none());
///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(tx.closed().now_or_never().is_some());
/// assert!(weak.closed().now_or_never().is_none());
///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(weak.closed().now_or_never().is_some());

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:documentation; feedback: The Gemini AI reviewer is correct! The doc test is supposed to show that the WeakSender is not closed until the last channel receiver is dropped but instead it uses the strong Sender. Prevents releasing wrong documentation.

@augmentcode
Copy link

augmentcode bot commented Feb 11, 2026

🤖 Augment PR Summary

Summary: This PR adds a closed() async API to broadcast::WeakSender and clarifies broadcast channel closure semantics by splitting sender-vs-receiver closure state.

Changes:

  • Replaced the single `Tail::closed` flag with two flags: closed_for_senders (no receivers) and closed_for_receivers (last sender dropped).
  • Refactored Sender::closed() to delegate to a shared Shared::closed_for_senders() helper.
  • Implemented WeakSender::closed() using the same helper to allow waiting for receiver-count-to-zero even after all strong senders are dropped.
  • Updated receiver-side close detection to key off closed_for_receivers when returning *::Closed errors.
  • Added tests covering WeakSender::closed() and its interaction with extra subscribes.

🤖 Was this summary useful? React with 👍 or 👎

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

///
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// drop(rx1);
/// assert!(tx.closed().now_or_never().is_none());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WeakSender::closed docs example uses tx.closed() instead of weak.closed(), so it doesn't actually demonstrate the new API (and may mislead readers about usage when all strong senders are dropped). Consider updating the example to await/check weak.closed() specifically.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:documentation; feedback: The Augment AI reviewer is correct! The doc test is supposed to show that the WeakSender is not closed until the last channel receiver is dropped but instead it uses the strong Sender. Prevents releasing wrong documentation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tokio/src/sync/broadcast.rs`:
- Around line 1118-1146: The doc example for WeakSender::closed() mistakenly
calls tx.closed() instead of demonstrating the weak handle; update the example
to call weak.closed() (the WeakSender returned by tx.downgrade()) in the two
assertions that currently use tx.closed() so the snippet exercises weak.closed()
as intended; keep the rest of the example (creating tx via broadcast::channel,
tx.downgrade(), recv and drops) unchanged.
🧹 Nitpick comments (1)
tokio/src/sync/broadcast.rs (1)

1599-1602: Set closed_for_senders before calling notify_waiters() for defensive ordering.

Currently notify_waiters() fires on line 1600 before the flag is set on line 1601. This works because the tail lock is held throughout, so woken tasks block on the lock. However, the conventional pattern is state-then-notify, and reversing these two lines would make the code more robust against future refactoring that might release the lock between them.

♻️ Suggested reorder
         if remaining_rx == 0 {
-            self.shared.notify_last_rx_drop.notify_waiters();
             tail.closed_for_senders = true;
+            self.shared.notify_last_rx_drop.notify_waiters();
         }

Comment on lines +1118 to +1146
/// A future which completes when the number of [Receiver]s subscribed to this channel reaches
/// zero.
///
/// # Examples
///
/// ```
/// use futures::FutureExt;
/// use tokio::sync::broadcast;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// let (tx, mut rx1) = broadcast::channel::<u32>(16);
/// let mut rx2 = tx.subscribe();
///
/// let _ = tx.send(10);
/// let weak = tx.downgrade();
///
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// drop(rx1);
/// assert!(tx.closed().now_or_never().is_none());
///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(tx.closed().now_or_never().is_some());
/// # }
/// ```
pub async fn closed(&self) {
self.shared.closed_for_senders().await;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Doc example never calls weak.closed() — it only exercises tx.closed().

This is the documentation for WeakSender::closed(), but the example creates weak via tx.downgrade() on line 1133 and never uses it. Lines 1137 and 1141 call tx.closed() instead. The example should demonstrate weak.closed().

📝 Suggested doc fix
     /// let _ = tx.send(10);
     /// let weak = tx.downgrade();
     ///
     /// assert_eq!(rx1.recv().await.unwrap(), 10);
     /// drop(rx1);
-    /// assert!(tx.closed().now_or_never().is_none());
+    /// assert!(weak.closed().now_or_never().is_none());
     ///
     /// assert_eq!(rx2.recv().await.unwrap(), 10);
     /// drop(rx2);
-    /// assert!(tx.closed().now_or_never().is_some());
+    /// assert!(weak.closed().now_or_never().is_some());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// A future which completes when the number of [Receiver]s subscribed to this channel reaches
/// zero.
///
/// # Examples
///
/// ```
/// use futures::FutureExt;
/// use tokio::sync::broadcast;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// let (tx, mut rx1) = broadcast::channel::<u32>(16);
/// let mut rx2 = tx.subscribe();
///
/// let _ = tx.send(10);
/// let weak = tx.downgrade();
///
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// drop(rx1);
/// assert!(tx.closed().now_or_never().is_none());
///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(tx.closed().now_or_never().is_some());
/// # }
/// ```
pub async fn closed(&self) {
self.shared.closed_for_senders().await;
}
/// A future which completes when the number of [Receiver]s subscribed to this channel reaches
/// zero.
///
/// # Examples
///
///
🤖 Prompt for AI Agents
In `@tokio/src/sync/broadcast.rs` around lines 1118 - 1146, The doc example for
WeakSender::closed() mistakenly calls tx.closed() instead of demonstrating the
weak handle; update the example to call weak.closed() (the WeakSender returned
by tx.downgrade()) in the two assertions that currently use tx.closed() so the
snippet exercises weak.closed() as intended; keep the rest of the example
(creating tx via broadcast::channel, tx.downgrade(), recv and drops) unchanged.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:documentation; feedback: The CodeRabbit AI reviewer is correct! The doc test is supposed to show that the WeakSender is not closed until the last channel receiver is dropped but instead it uses the strong Sender. Prevents releasing wrong documentation.

@martin-augment
Copy link
Owner Author

🐛 Critical: Documentation Example Bug

// In WeakSender::closed() documentation (lines 1137, 1141)
assert\!(tx.closed().now_or_never().is_none());  // ❌ Wrong
assert\!(tx.closed().now_or_never().is_some());  // ❌ Wrong

Problem: The example demonstrates WeakSender::closed() but calls tx.closed() instead of weak.closed(). This will confuse users about the actual API being demonstrated.

Fix: Change to:

assert\!(weak.closed().now_or_never().is_none());
assert\!(weak.closed().now_or_never().is_some());

value:useful; category:documentation; feedback: The Claude AI reviewer is correct! The doc test is supposed to show that the WeakSender is not closed until the last channel receiver is dropped but instead it uses the strong Sender. Prevents releasing wrong documentation.

@martin-augment
Copy link
Owner Author

1599-1602: Set closed_for_senders before calling notify_waiters() for defensive ordering.

Currently notify_waiters() fires on line 1600 before the flag is set on line 1601. This works because the tail lock is held throughout, so woken tasks block on the lock. However, the conventional pattern is state-then-notify, and reversing these two lines would make the code more robust against future refactoring that might release the lock between them.

value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! The pattern is wrongly applied - first notify the subscribers and then change the state. It should be the opposite - first update the state and then notify. The acquired lock is still the same, so there is no behavior change. It will be semantically better.

@martin-augment
Copy link
Owner Author

⚠️ Ordering Issue: async_trace_leaf and cooperative

Looking at tokio/src/sync/watch.rs:1314-1329, the established pattern in Tokio is:

cooperative(async {
    crate::trace::async_trace_leaf().await;
    // ... rest of logic
})
.await

However, the new closed_for_senders() method does:

crate::trace::async_trace_leaf().await;
cooperative(async {
    // ... logic
})
.await

Impact: The trace leaf is called outside the cooperative wrapper, which means the tracing won't be properly associated with the cooperative context.

Recommendation: Move async_trace_leaf() inside the cooperative block to match the pattern used in watch.rs and other sync primitives.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The call to async_trace_leaf() should be inside the cooperative() call even if it does not need the cooperative context. It may need to use it one day. Also it is always good to keep the code consistent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments