Skip to content

7900: sync: add closed method to broadcast::WeakSender#62

Open
martin-augment wants to merge 1 commit intomasterfrom
pr-7900-2026-02-16-08-25-00
Open

7900: sync: add closed method to broadcast::WeakSender#62
martin-augment wants to merge 1 commit intomasterfrom
pr-7900-2026-02-16-08-25-00

Conversation

@martin-augment
Copy link
Owner

7900: To review by AI

@gemini-code-assist
Copy link

Summary of Changes

Hello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the tokio::sync::broadcast channel by introducing a closed method for WeakSender handles. This allows for more robust monitoring of broadcast channels, enabling asynchronous code to await the complete cessation of all active receivers. The change involved a refactoring of the internal channel state tracking to better distinguish between the absence of senders and the absence of receivers, improving the clarity and correctness of channel closure detection.

Highlights

  • Internal State Refactoring: Renamed the internal closed flag in the Tail struct to has_senders and inverted its boolean meaning for clarity in tracking channel state.
  • New WeakSender::closed Method: Introduced a new closed asynchronous method for broadcast::WeakSender, allowing users to await the point when all Receivers for a channel have been dropped.
  • Shared Closure Logic: Refactored the internal logic for awaiting channel closure into a shared closed_for_senders method, used by both Sender::closed and the new WeakSender::closed.
  • Updated Channel Lifecycle: Updated channel initialization and receiver drop logic to align with the new has_senders state tracking, removing previous closed flag manipulations.
  • New Unit Tests: Added comprehensive unit tests to validate the behavior of the new WeakSender::closed method and extended existing tests to cover its interaction with other channel features.
Changelog
  • tokio/src/sync/broadcast.rs
    • Renamed Tail::closed to Tail::has_senders and inverted its meaning to indicate the presence of senders.
    • Modified Sender::closed to delegate its logic to a new Shared::closed_for_senders helper method.
    • Removed the logic that would reset the closed state when a new receiver was added.
    • Implemented Shared::closed_for_senders to provide a unified way to await the drop of all receivers.
    • Added pub async fn closed() to WeakSender, utilizing Shared::closed_for_senders.
    • Adjusted Receiver::drop to no longer set tail.closed = true when the last receiver is dropped.
    • Updated Receiver::try_recv to use the new !tail.has_senders condition for checking channel closure.
  • tokio/tests/sync_broadcast.rs
    • Added a new test case broadcast_weak_sender_closed to specifically verify the functionality of the WeakSender::closed method.
    • Extended the existing broadcast_sender_closed_with_extra_subscribe test to include assertions for WeakSender::closed behavior alongside Sender::closed.
Activity
  • The pull request description indicates it is "To review by AI", suggesting no human review activity has occurred yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Feb 16, 2026

Walkthrough

This change refactors the broadcast channel closure semantics in Tokio's sync module. The Tail struct's closed field is replaced with has_senders to track active senders. Sender::closed() now delegates to a new async method Shared::closed_for_senders() that waits until all receivers drop. Receiver closure detection is updated to rely on has_senders instead of the closed flag. A new WeakSender::closed() method is added. Tests verify that WeakSender::closed() behaves consistently with the strong sender path, remaining pending until all strong receivers are dropped.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-7900-2026-02-16-08-25-00

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Feb 16, 2026

Code Review - PR #62: sync: add closed method to broadcast::WeakSender

Summary

This PR adds a closed() method to broadcast::WeakSender and refactors the internal channel state tracking. The core change replaces the closed boolean field with has_senders to better reflect the actual semantics of the broadcast channel.

✅ Strengths

  1. Clear motivation: The use case (stopping expensive background tasks when no subscribers exist) is well-documented in the linked issue.

  2. Good code reuse: The new closed_for_senders() helper method is shared between Sender::closed() and WeakSender::closed(), reducing duplication.

  3. Comprehensive test coverage: The updated tests thoroughly exercise the new functionality, including edge cases with subscription/unsubscription patterns.

  4. Semantic clarity improvement: Renaming closed to has_senders better reflects what the field tracks - this is a positive refactoring that makes the code more understandable.

  5. Proper use of cooperative scheduling: The new closed_for_senders() method correctly uses cooperative() and async_trace_leaf(), consistent with other async methods in the module.

🔍 Observations & Potential Concerns

1. Semantic Change in Channel State (Medium Priority)

The old closed field was set to true in two scenarios:

  • When the last sender was dropped (close_channel())
  • When the last receiver was dropped (line removed in Receiver::drop)

The new has_senders field only tracks sender state. This is intentional and correct for the new semantics, but it's worth confirming:

  • Old behavior: closed = true meant "no senders OR no receivers"
  • New behavior: has_senders = false means "no senders"

The check at line 1292 changes from:

if tail.closed { return Err(TryRecvError::Closed); }

to:

if !tail.has_senders { return Err(TryRecvError::Closed); }

This maintains correct behavior because receivers should return Closed when there are no senders, regardless of whether other receivers exist. ✅

2. Initialization Logic (Low Priority)

The old code had this removed logic in new_receiver():

if tail.rx_cnt == 0 {
    // Potentially need to re-open the channel, if a new receiver has been added
    tail.closed = false;
}

This "re-opening" logic is no longer needed because has_senders is independent of receiver count. This is correct. ✅

3. Documentation Clarity (Minor)

The docstring for WeakSender::closed() states:

"A future which completes when the number of [Receiver]s subscribed to this channel reaches zero."

This is accurate and matches the behavior. The example is excellent and clearly demonstrates the use case. ✅

🎯 Code Quality Assessment

  • Best Practices: ✅ Follows Tokio's patterns for async primitives
  • Performance: ✅ No performance regressions; same lock acquisition patterns
  • Security: ✅ No security concerns; maintains proper synchronization
  • Test Coverage: ✅ Excellent - tests cover both Sender::closed() and WeakSender::closed() in various scenarios
  • API Design: ✅ Symmetric API between Sender and WeakSender is intuitive

📝 Minor Suggestions

  1. Comment clarity: The comment at line 369 could be more precise:

    /// True if there are no senders.
    has_senders: bool,

    Consider: /// True if there are any senders (strong Sender handles exist).
    This would make the positive case clearer since has_senders: true means senders exist.

  2. Test naming: The test broadcast_sender_closed_with_extra_subscribe now tests both Sender::closed() and WeakSender::closed(). Consider renaming to broadcast_sender_and_weak_closed_with_extra_subscribe or splitting into separate tests for clarity.

✅ Verdict

Approve with minor suggestions. This is a solid implementation that:

  • Solves a real use case (monitoring receiver count without holding a strong sender)
  • Improves code clarity with better naming (has_senders vs closed)
  • Maintains backward compatibility (existing Sender::closed() behavior unchanged)
  • Has excellent test coverage
  • Follows all Tokio conventions and best practices

The refactoring from closed to has_senders is a welcome improvement that makes the internal state tracking more explicit and easier to reason about.


Review generated with assistance from Claude Code

@augmentcode
Copy link

augmentcode bot commented Feb 16, 2026

🤖 Augment PR Summary

Summary: Add broadcast::WeakSender::closed() and unify the “wait for last receiver drop” logic.
Changes:

  • Rename Tail::closed to Tail::has_senders and use it for “no senders” receiver-close detection.
  • Factor the “await receivers == 0” loop into Shared::closed_for_senders(), used by both Sender::closed() and WeakSender::closed().
  • Remove the old “re-open channel on subscribe” bookkeeping tied to Tail::closed.
  • Add/extend broadcast tests covering WeakSender::closed() and extra subscribe() interactions.

🤖 Was this summary useful? React with 👍 or 👎

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.


/// True if the channel is closed.
closed: bool,
/// True if there are no senders.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment for has_senders appears inverted: close_channel() sets has_senders = false when the last Sender drops, and receivers treat !has_senders as closed. Consider updating the comment to reflect that true means there are senders.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback: The Augment AI reviewer is correct! The docstring for has_senders is still related to the old/removed closed flag. It should be updated to say that it would be true if there are live strong senders instead.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
tokio/src/sync/broadcast.rs (2)

1037-1057: closed_for_senders follows the correct Notify pattern — but consider the semantic gap with Sender::closed().

The implementation correctly creates the notified future before checking rx_cnt under the lock, which avoids missing notifications. The cooperative wrapper and trace leaf are consistent with similar methods in the codebase (e.g., watch::Sender::closed()).

One observation: this method name (closed_for_senders) and its semantics (waits until all receivers drop) are shared by both Sender::closed() and WeakSender::closed(). For Sender::closed(), this works because a strong sender keeps the channel alive — it's waiting for all receivers to leave. For WeakSender::closed(), however, the channel may already be closed (all strong senders dropped), yet this method still waits for receivers to drain. That's the intended design per the tests, but the doc comment on WeakSender::closed() should clarify that it waits for receivers to drop rather than for the channel to close (which may have already happened).


1107-1136: Doc comment says "Receivers subscribed to this channel" but does not clarify the WeakSender-specific behavior.

The doc mirrors Sender::closed() ("completes when the number of Receivers reaches zero"), but for a WeakSender the channel may already have no strong senders — meaning the semantic context is different. A user might expect WeakSender::closed() to complete immediately once all strong senders are gone (channel is "closed"), but instead it waits for receivers to drain. Consider adding a note like: "This completes when all receivers have been dropped, regardless of whether strong senders still exist."

The example is good and clearly demonstrates the expected behavior with now_or_never().

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request successfully adds the closed method to broadcast::WeakSender and refactors the internal state management to decouple sender-based closure from receiver counts. The changes are well-tested and maintain consistency with existing patterns in the broadcast module. I have identified one minor documentation error in the Tail struct where a comment incorrectly describes the state of a boolean field.


/// True if the channel is closed.
closed: bool,
/// True if there are no senders.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment for has_senders is incorrect. Based on the implementation in Sender::new_with_receiver_count (where it is initialized to true) and close_channel (where it is set to false when senders are dropped), this field is true when there are active senders, not when there are none.

Suggested change
/// True if there are no senders.
/// True if there are senders.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback: The Gemini AI reviewer is correct! The docstring for has_senders is still related to the old/removed closed flag. It should be updated to say that it would be true if there are live strong senders instead.

@martin-augment
Copy link
Owner Author

  1. Comment clarity: The comment at line 369 could be more precise:
    /// True if there are no senders.
    has_senders: bool,
    Consider: /// True if there are any senders (strong Sender handles exist).
    This would make the positive case clearer since has_senders: true means senders exist

value:good-to-have; category:documentation; feedback: The Claude AI reviewer is correct! The docstring for has_senders is still related to the old/removed closed flag. It should be updated to say that it would be true if there are live strong senders instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments