-
Notifications
You must be signed in to change notification settings - Fork 218
[ISSUE #5525]♻️Refactor controller to remove RocketMQRuntime dependency and use ControllerConfig instead #5526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…cy and use ControllerConfig instead
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis pull request refactors the controller architecture to replace the Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom Pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @rocketmq-controller/src/controller/open_raft_controller.rs:
- Around line 50-64: The startup() function creates a RaftNodeManager and spawns
the gRPC server but never stores the RaftNodeManager or a shutdown handle,
preventing later access and graceful shutdown; modify the struct backing this
controller to hold an Option<Arc<RaftNodeManager>> (or a field like node_mgr)
and a oneshot or watch sender for shutdown (e.g., shutdown_tx), then in
startup() assign the created Arc<RaftNodeManager> into that field instead of
letting it drop, and replace Server::builder().serve(...) with
Server::builder().serve_with_shutdown(self.config.listen_addr, async move {
shutdown_rx.await.ok(); }) (or store the JoinHandle/abort handle) so shutdown()
can signal the stored sender (or abort the handle) to stop the server and
preserve access to RaftNodeManager via the stored node_mgr.
🧹 Nitpick comments (3)
rocketmq-controller/src/controller/controller_manager.rs (1)
145-150: Consider removing the commented-out runtime initialization.The commented-out runtime line is dead code after the refactoring. Leaving it in place adds noise and may cause confusion for future maintainers. Since the refactoring is intentional, the commented code should be removed entirely.
🔎 Proposed fix
- // Initialize RocketMQ runtime for Raft controller - //let runtime = Arc::new(RocketMQRuntime::new_multi(2, "controller-runtime")); - // Initialize Raft controller for leader election // This MUST succeed before proceeding // Using OpenRaft implementation by default let raft_arc = Arc::new(RaftController::new_open_raft(Arc::clone(&config)));rocketmq-controller/src/controller.rs (1)
600-603: Remove commented-outget_runtimemethod.This commented-out code is now obsolete after the refactoring. Remove it to keep the codebase clean.
🔎 Proposed fix
fn register_broker_lifecycle_listener(&self, _listener: Arc<dyn BrokerLifecycleListener>) { // No-op } - - /* fn get_runtime(&self) -> Arc<RocketMQRuntime> { - unimplemented!("MockController does not provide runtime in tests") - }*/ }rocketmq-controller/src/controller/open_raft_controller.rs (1)
58-62: Usetracing::error!instead ofeprintln!for consistency.The rest of the codebase uses
tracingfor logging. Usingeprintln!here is inconsistent and won't integrate with the logging infrastructure.🔎 Proposed fix
+use tracing::error; + tokio::spawn(async move { if let Err(e) = server.await { - eprintln!("gRPC server error for node {}: {}", node_id_for_error, e); + error!("gRPC server error for node {}: {}", node_id_for_error, e); } });
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
rocketmq-controller/src/controller.rsrocketmq-controller/src/controller/controller_manager.rsrocketmq-controller/src/controller/open_raft_controller.rsrocketmq-controller/src/controller/raft_controller.rsrocketmq-controller/src/controller/raft_rs_controller.rsrocketmq-controller/tests/raft_controller_test.rs
💤 Files with no reviewable changes (1)
- rocketmq-controller/src/controller/raft_rs_controller.rs
🧰 Additional context used
🧬 Code graph analysis (3)
rocketmq-controller/src/controller/controller_manager.rs (4)
rocketmq-controller/src/controller/open_raft_controller.rs (1)
new(44-46)rocketmq-controller/src/controller/raft_rs_controller.rs (1)
new(41-43)rocketmq-common/src/common/controller/controller_config.rs (1)
new(257-259)rocketmq-controller/src/controller/raft_controller.rs (1)
new_open_raft(51-53)
rocketmq-controller/src/controller/open_raft_controller.rs (2)
rocketmq-controller/src/controller/controller_manager.rs (2)
controller(620-622)config(575-577)rocketmq-controller/src/controller/raft_controller.rs (1)
startup(62-67)
rocketmq-controller/src/controller/raft_controller.rs (3)
rocketmq-controller/src/controller/controller_manager.rs (3)
controller(620-622)config(575-577)new(139-211)rocketmq-controller/src/controller/open_raft_controller.rs (1)
new(44-46)rocketmq-controller/src/controller/raft_rs_controller.rs (1)
new(41-43)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Build & Test (ubuntu-latest)
- GitHub Check: Build & Test (macos-latest)
- GitHub Check: Build & Test (windows-latest)
- GitHub Check: Code Coverage
- GitHub Check: auto-approve
🔇 Additional comments (4)
rocketmq-controller/tests/raft_controller_test.rs (2)
22-30: LGTM!The test correctly uses
ControllerConfig::test_config()for OpenRaft initialization, aligning with the config-driven approach. The lifecycle test properly verifies startup, leadership status, and shutdown.
52-78: LGTM!The wrapper test correctly differentiates between OpenRaft (config-based) and RaftRs (runtime-based) initialization, properly testing both controller variants with their respective dependencies.
rocketmq-controller/src/controller/raft_controller.rs (1)
49-58: LGTM!The
new_open_raftconstructor signature change fromArc<RocketMQRuntime>toArc<ControllerConfig>properly implements the config-driven initialization. The asymmetric design is correct: OpenRaft uses config while RaftRs continues to use runtime, allowing flexibility in implementation approaches.rocketmq-controller/src/controller/open_raft_controller.rs (1)
38-47: LGTM on the config-driven initialization.The struct and constructor correctly implement the config-based approach as intended by the refactoring. The
Arc<ControllerConfig>is properly stored and cloned where needed.
| async fn startup(&self) -> RocketMQResult<()> { | ||
| // TODO: Initialize OpenRaft node | ||
| let node = Arc::new(RaftNodeManager::new(Arc::clone(&self.config)).await?); | ||
| let service = GrpcRaftService::new(node.raft()); | ||
| let server = Server::builder() | ||
| .add_service(OpenRaftServiceServer::new(service)) | ||
| .serve(self.config.listen_addr); | ||
|
|
||
| let node_id_for_error = self.config.node_id; | ||
| tokio::spawn(async move { | ||
| if let Err(e) = server.await { | ||
| eprintln!("gRPC server error for node {}: {}", node_id_for_error, e); | ||
| } | ||
| }); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing storage of RaftNodeManager and shutdown handle.
The node and gRPC server handle are not stored, which creates two issues:
-
RaftNodeManagerlifecycle: Thenodeis created but only passed to the service. If the manager holds important state beyond the Raft handle, it may not be accessible later. -
No graceful shutdown: The spawned gRPC server has no shutdown mechanism. The
shutdown()method (line 66-68) cannot stop the server because there's no handle stored.
Consider storing the RaftNodeManager and using tonic's Server::serve_with_shutdown or storing a shutdown signal sender.
🔎 Proposed approach
pub struct OpenRaftController {
config: Arc<ControllerConfig>,
+ node: Option<Arc<RaftNodeManager>>,
+ shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl OpenRaftController {
pub fn new(config: Arc<ControllerConfig>) -> Self {
- Self { config }
+ Self { config, node: None, shutdown_tx: None }
}
}Then in startup(), store the node and use serve_with_shutdown for graceful termination.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @rocketmq-controller/src/controller/open_raft_controller.rs around lines 50 -
64, The startup() function creates a RaftNodeManager and spawns the gRPC server
but never stores the RaftNodeManager or a shutdown handle, preventing later
access and graceful shutdown; modify the struct backing this controller to hold
an Option<Arc<RaftNodeManager>> (or a field like node_mgr) and a oneshot or
watch sender for shutdown (e.g., shutdown_tx), then in startup() assign the
created Arc<RaftNodeManager> into that field instead of letting it drop, and
replace Server::builder().serve(...) with
Server::builder().serve_with_shutdown(self.config.listen_addr, async move {
shutdown_rx.await.ok(); }) (or store the JoinHandle/abort handle) so shutdown()
can signal the stored sender (or abort the handle) to stop the server and
preserve access to RaftNodeManager via the stored node_mgr.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5526 +/- ##
=======================================
Coverage 38.40% 38.41%
=======================================
Files 815 815
Lines 110515 110512 -3
=======================================
+ Hits 42443 42452 +9
+ Misses 68072 68060 -12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Fixes #5525
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.