Skip to content

Conversation

@mxsm
Copy link
Owner

@mxsm mxsm commented Jan 7, 2026

Which Issue(s) This PR Fixes(Closes)

Fixes #5531

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • Refactor
    • Refactored controller lifecycle management to improve internal state handling during startup and shutdown operations.
    • Simplified synchronization patterns in the controller implementation to reduce overhead and enhance performance.
    • Updated method signatures across controller implementations for consistency.

✏️ Tip: You can customize this high-level summary in your review settings.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 7, 2026

Walkthrough

This PR refactors the RocketMQ controller architecture to support mutable state during lifecycle operations. It converts RaftController storage from Arc to ArcMut, updates startup/shutdown method signatures from immutable to mutable borrows, simplifies OpenRaftController's internal state management by removing unnecessary Arc<Mutex<>> wrappers, and propagates these changes through ControllerManager and all processor implementations.

Changes

Cohort / File(s) Summary
Controller trait and core implementations
rocketmq-controller/src/controller.rs
Updated Controller trait startup/shutdown signatures from &self to &mut self; synchronized MockController implementation and test usage to require mutable receiver.
OpenRaftController internal state
rocketmq-controller/src/controller/open_raft_controller.rs
Simplified state management by replacing Arc<Mutex<Option<...>>> wrappers with direct Option<...> types for node, handle, and shutdown_tx fields; updated startup/shutdown to &mut self and use direct field assignment/taking instead of mutex guards.
RaftController implementations
rocketmq-controller/src/controller/raft_controller.rs, rocketmq-controller/src/controller/raft_rs_controller.rs
Updated startup/shutdown method signatures from &self to &mut self to align with trait changes; internal logic remains unchanged.
ControllerManager state access
rocketmq-controller/src/controller/controller_manager.rs
Changed raft_controller field from Arc<RaftController> to ArcMut<RaftController>; updated startup/shutdown calls to use mut_from_ref() for mutable access; updated public accessor to return &ArcMut<RaftController>.
ProcessorManager and broker processors
rocketmq-controller/src/processor.rs, rocketmq-controller/src/processor/broker_processor.rs
Updated raft field and constructor signatures in ProcessorManager, RegisterBrokerProcessor, UnregisterBrokerProcessor, and ElectMasterProcessor from Arc<RaftController> to ArcMut<RaftController>.
Topic processors
rocketmq-controller/src/processor/topic_processor.rs
Updated raft field and constructor signatures in CreateTopicProcessor, UpdateTopicProcessor, and DeleteTopicProcessor from Arc<RaftController> to ArcMut<RaftController>; added ArcMut import.
Test updates
rocketmq-controller/tests/raft_controller_test.rs
Made local controller variables mutable across lifecycle tests (test_open_raft_controller_lifecycle, test_raft_rs_controller_lifecycle, test_raft_controller_wrapper).

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25–30 minutes

Possibly related PRs

  • #5348: Makes overlapping structural changes to ControllerManager, RaftController, and processor files by converting Arc to ArcMut and updating startup/shutdown to require mutable borrows.

Suggested labels

refactor♻️, approved, auto merge, AI review first

Suggested reviewers

  • SpaceXCN
  • TeslaRustor
  • rocketmq-rust-bot

Poem

🐰 Arc bows to ArcMut, mutable state's the way,
Lifecycle methods bloom with &mut self today,
No more mutex locks guard the inner flow,
State flows direct, watch the controllers grow,
Ownership refined, yet the logic stays true! ✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main refactoring: converting controllers to use ArcMut for mutable access to RaftController, matching the primary changes across all modified files.
Linked Issues check ✅ Passed The pull request implements the stated objective from issue #5531 to refactor controllers to use ArcMut for mutable access to RaftController across all relevant files and components.
Out of Scope Changes check ✅ Passed All changes are directly related to the refactoring objective: Controller trait signatures updated to use &mut self, OpenRaftController fields simplified, ArcMut substituted for Arc, and tests adjusted accordingly.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @rocketmq-controller/src/controller/raft_controller.rs:
- Around line 62-67: The startup method should use the same ArcMut access
pattern used elsewhere: instead of calling controller.startup().await directly
on the &mut ArcMut<T> enum branches, call
controller.mut_from_ref().startup().await for both Self::OpenRaft(controller)
and Self::RaftRs(controller) so it matches the existing explicit mut_from_ref()
usage (referencing ArcMut, mut_from_ref, and the startup method on the inner
controller).
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d2768cf and e595a9b.

📒 Files selected for processing (9)
  • rocketmq-controller/src/controller.rs
  • rocketmq-controller/src/controller/controller_manager.rs
  • rocketmq-controller/src/controller/open_raft_controller.rs
  • rocketmq-controller/src/controller/raft_controller.rs
  • rocketmq-controller/src/controller/raft_rs_controller.rs
  • rocketmq-controller/src/processor.rs
  • rocketmq-controller/src/processor/broker_processor.rs
  • rocketmq-controller/src/processor/topic_processor.rs
  • rocketmq-controller/tests/raft_controller_test.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-05-10T06:20:00.401Z
Learnt from: 578223592
Repo: mxsm/rocketmq-rust PR: 3240
File: rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs:69-71
Timestamp: 2025-05-10T06:20:00.401Z
Learning: In RocketMQ Rust, while dashmap is concurrent-safe, returning a direct reference to a DashMap can break encapsulation by allowing external code to modify the map without triggering persistence methods like `persist()`. Consider returning a read-only view, iterator methods, or a snapshot instead.

Applied to files:

  • rocketmq-controller/src/processor/broker_processor.rs
  • rocketmq-controller/src/processor/topic_processor.rs
🧬 Code graph analysis (7)
rocketmq-controller/src/processor.rs (3)
rocketmq-controller/src/controller/controller_manager.rs (4)
  • new (139-211)
  • config (575-577)
  • raft (548-550)
  • metadata (557-559)
rocketmq-controller/src/processor/broker_processor.rs (4)
  • new (50-52)
  • new (128-130)
  • new (184-186)
  • new (235-237)
rocketmq-controller/src/processor/topic_processor.rs (3)
  • new (46-48)
  • new (126-128)
  • new (197-199)
rocketmq-controller/src/controller/raft_controller.rs (4)
rocketmq-controller/src/controller.rs (4)
  • startup (183-183)
  • startup (523-525)
  • shutdown (200-200)
  • shutdown (527-529)
rocketmq-controller/src/controller/open_raft_controller.rs (2)
  • startup (77-112)
  • shutdown (114-147)
rocketmq-controller/src/controller/controller_manager.rs (2)
  • controller (620-622)
  • shutdown (457-510)
rocketmq-controller/src/processor.rs (1)
  • shutdown (149-153)
rocketmq-controller/src/controller.rs (3)
rocketmq-controller/src/controller/open_raft_controller.rs (3)
  • startup (77-112)
  • shutdown (114-147)
  • new (66-73)
rocketmq-controller/src/controller/raft_controller.rs (2)
  • startup (62-67)
  • shutdown (69-74)
rocketmq-controller/src/controller/raft_rs_controller.rs (3)
  • startup (47-50)
  • shutdown (52-55)
  • new (41-43)
rocketmq-controller/src/processor/broker_processor.rs (3)
rocketmq-controller/src/controller/controller_manager.rs (3)
  • new (139-211)
  • metadata (557-559)
  • raft (548-550)
rocketmq-controller/src/processor.rs (2)
  • new (79-127)
  • new (192-196)
rocketmq-controller/src/processor/topic_processor.rs (3)
  • new (46-48)
  • new (126-128)
  • new (197-199)
rocketmq-controller/src/processor/topic_processor.rs (2)
rocketmq-controller/src/processor.rs (2)
  • new (79-127)
  • new (192-196)
rocketmq-controller/src/processor/broker_processor.rs (4)
  • new (50-52)
  • new (128-130)
  • new (184-186)
  • new (235-237)
rocketmq-controller/src/controller/controller_manager.rs (3)
rocketmq-controller/src/controller/open_raft_controller.rs (1)
  • new (66-73)
rocketmq-controller/src/processor.rs (2)
  • new (79-127)
  • new (192-196)
rocketmq-controller/src/controller/raft_controller.rs (1)
  • new_open_raft (51-53)
rocketmq-controller/src/controller/open_raft_controller.rs (1)
rocketmq-controller/src/controller.rs (4)
  • startup (183-183)
  • startup (523-525)
  • shutdown (200-200)
  • shutdown (527-529)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Build & Test (ubuntu-latest)
  • GitHub Check: Build & Test (macos-latest)
  • GitHub Check: Build & Test (windows-latest)
  • GitHub Check: Code Coverage
  • GitHub Check: auto-approve
🔇 Additional comments (28)
rocketmq-controller/src/controller.rs (4)

183-183: LGTM: Trait signature updated to support stateful lifecycle operations.

The change from &self to &mut self is appropriate for startup(), as implementations like OpenRaftController need to initialize and store mutable state (node, handle, shutdown_tx). This aligns with typical lifecycle patterns where startup involves state transitions.


200-200: LGTM: Shutdown signature correctly requires mutable access.

Changing shutdown() to take &mut self is correct, as implementations need to take ownership of resources (via Option::take()) to perform cleanup. This is consistent with the startup changes and standard Rust patterns for resource cleanup.


523-529: LGTM: MockController updated consistently.

The MockController implementation correctly reflects the trait signature changes. The mock remains simple and appropriate for testing purposes.


611-614: LGTM: Test updated for new API.

The test correctly uses a mutable binding to accommodate the updated trait methods. The test remains clear and validates the lifecycle operations properly.

rocketmq-controller/src/processor.rs (2)

68-68: LGTM: Field type updated to support mutable access.

Changing from Arc<RaftController> to ArcMut<RaftController> enables processors to obtain mutable access to the controller when needed for lifecycle operations. This aligns with the broader refactoring objective.


79-127: LGTM: Constructor and processor instantiations updated consistently.

The changes are applied uniformly across:

  • Constructor parameter (line 79)
  • All processor instantiations (lines 86, 90, 98, 110, 114, 118)

Each processor now receives ArcMut<RaftController> via raft.clone(), maintaining proper reference counting while enabling mutable access patterns.

rocketmq-controller/tests/raft_controller_test.rs (3)

25-29: LGTM: Test updated for mutable lifecycle methods.

The test correctly uses a mutable binding to accommodate the new startup(&mut self) and shutdown(&mut self) signatures. The test logic remains unchanged and properly validates the controller lifecycle.


38-42: LGTM: RaftRs controller test updated consistently.

The mutable binding is correctly applied, matching the pattern established in the OpenRaft test. Both test variants now properly handle the new trait requirements.


60-71: LGTM: Wrapper test updated for both variants.

Both controller variants (OpenRaft and RaftRs) are instantiated with mutable bindings, ensuring the test validates the refactored API consistently across implementations.

rocketmq-controller/src/controller/raft_controller.rs (1)

69-74: Same dereferencing concern for shutdown.

Similar to startup(), verify that the direct method call on ArcMut works as intended. See comment on lines 62-67.

rocketmq-controller/src/controller/raft_rs_controller.rs (1)

47-55: LGTM: Stub implementation updated consistently.

The signature changes align with the Controller trait requirements. The stub implementations with TODO comments are appropriate for this stage of development.

rocketmq-controller/src/processor/broker_processor.rs (4)

18-18: LGTM!

The ArcMut import aligns with the PR objective to enable mutable access to the RaftController across processors.


40-52: LGTM!

The RegisterBrokerProcessor correctly migrates to ArcMut<RaftController>. The is_leader() call works transparently through ArcMut's Deref implementation.


117-130: LGTM!

The UnregisterBrokerProcessor follows the same consistent refactoring pattern.


224-237: LGTM!

The ElectMasterProcessor completes the consistent migration pattern across all broker processors requiring raft access.

rocketmq-controller/src/processor/topic_processor.rs (4)

17-17: LGTM!

The ArcMut import is correctly added for the topic processors.


35-48: LGTM!

The CreateTopicProcessor correctly migrates to ArcMut<RaftController>, consistent with the broker processors.


115-128: LGTM!

The UpdateTopicProcessor follows the same consistent refactoring pattern.


186-199: LGTM!

The DeleteTopicProcessor completes the consistent migration pattern across all topic processors.

rocketmq-controller/src/controller/controller_manager.rs (5)

90-91: LGTM!

The field type change to ArcMut<RaftController> with updated documentation is consistent with the PR objective.


150-150: LGTM!

The ArcMut::new() initialization correctly wraps the RaftController for shared mutable access.


376-383: LGTM!

The mut_from_ref() call is appropriate here since start() takes mut self: ArcMut<Self>, providing legitimate mutable access to start the Raft controller.


620-622: LGTM!

The accessor correctly returns a reference to the ArcMut<RaftController>, allowing callers to clone for shared access.


498-502: The ArcMut::mut_from_ref() pattern is safe in this context.

While ArcMut explicitly requires callers to ensure no data races occur, the shutdown() method is protected by an atomic compare-exchange on the running field (line 459-462). This guarantees that only the first caller proceeds to invoke mut_from_ref() on the raft_controller; subsequent callers return immediately. This serialization pattern eliminates concurrent access and satisfies ArcMut's safety requirements.

rocketmq-controller/src/controller/open_raft_controller.rs (4)

55-63: Simplified state management by removing Arc<Mutex<>> wrappers.

The fields are now direct Option<...> types instead of Arc<Mutex<Option<...>>>. This is cleaner and shifts synchronization responsibility to the ArcMut wrapper at the ControllerManager level.


65-73: LGTM!

The constructor is now cleaner with direct None initialization instead of Arc::new(Mutex::new(None)) patterns.


77-112: LGTM!

The startup(&mut self) implementation is cleaner with direct field assignments. The mutable signature correctly reflects the semantic intent of modifying controller state during startup.


114-147: LGTM!

The shutdown(&mut self) implementation properly uses Option::take() for one-time resource consumption, maintaining the correct shutdown ordering: signal gRPC server → shutdown Raft node → await server task with timeout.

Comment on lines +62 to 67
async fn startup(&mut self) -> RocketMQResult<()> {
match self {
Self::OpenRaft(controller) => controller.startup().await,
Self::RaftRs(controller) => controller.startup().await,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for ArcMut implementation to verify DerefMut trait and mut_from_ref usage patterns
echo "=== Searching for DerefMut implementation for ArcMut ==="
rg -n "impl.*DerefMut.*for.*ArcMut" -A 5

echo ""
echo "=== Searching for mut_from_ref function definition ==="
rg -n "fn mut_from_ref" -A 3

echo ""
echo "=== Searching for mut_from_ref usage in codebase ==="
rg -n "\.mut_from_ref\(\)" -C 2

echo ""
echo "=== Searching for ArcMut definition and trait implementations ==="
rg -n "struct ArcMut|impl.*for ArcMut" -A 3

Repository: mxsm/rocketmq-rust

Length of output: 50374


Consider using mut_from_ref() for consistency with the same codebase pattern.

ArcMut correctly implements DerefMut, so the direct method call controller.startup().await on &mut ArcMut<T> works as intended. However, the same raft_controller field uses explicit mut_from_ref() calls elsewhere in controller_manager.rs (lines 376 and 498):

self.raft_controller.mut_from_ref().startup().await
self.raft_controller.mut_from_ref().shutdown().await

Align this code with the explicit pattern for consistency across the codebase.

🤖 Prompt for AI Agents
In @rocketmq-controller/src/controller/raft_controller.rs around lines 62 - 67,
The startup method should use the same ArcMut access pattern used elsewhere:
instead of calling controller.startup().await directly on the &mut ArcMut<T>
enum branches, call controller.mut_from_ref().startup().await for both
Self::OpenRaft(controller) and Self::RaftRs(controller) so it matches the
existing explicit mut_from_ref() usage (referencing ArcMut, mut_from_ref, and
the startup method on the inner controller).

@codecov
Copy link

codecov bot commented Jan 7, 2026

Codecov Report

❌ Patch coverage is 84.84848% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 38.43%. Comparing base (d2768cf) to head (e595a9b).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...mq-controller/src/controller/controller_manager.rs 25.00% 3 Missing ⚠️
...-controller/src/controller/open_raft_controller.rs 86.66% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5532      +/-   ##
==========================================
- Coverage   38.44%   38.43%   -0.01%     
==========================================
  Files         815      815              
  Lines      110567   110551      -16     
==========================================
- Hits        42508    42492      -16     
  Misses      68059    68059              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - All CI checks passed ✅

@rocketmq-rust-bot rocketmq-rust-bot merged commit a6f6cb2 into main Jan 7, 2026
20 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Jan 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge refactor♻️ refactor code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Refactor♻️] Refactor controllers to use ArcMut for mutable access to RaftController

4 participants