diff --git a/rocketmq-controller/src/controller.rs b/rocketmq-controller/src/controller.rs index bd3d522ec..b5d17cf01 100644 --- a/rocketmq-controller/src/controller.rs +++ b/rocketmq-controller/src/controller.rs @@ -177,10 +177,10 @@ pub trait Controller: Send + Sync { /// # Example /// /// ```rust,ignore - /// let controller = RaftController::new(config).await?; + /// let mut controller = RaftController::new(config).await?; /// controller.startup().await?; /// ``` - async fn startup(&self) -> RocketMQResult<()>; + async fn startup(&mut self) -> RocketMQResult<()>; /// Shutdown the controller gracefully /// @@ -197,7 +197,7 @@ pub trait Controller: Send + Sync { /// /// Returns error if graceful shutdown fails. The controller may still be /// in an inconsistent state, requiring external intervention. - async fn shutdown(&self) -> RocketMQResult<()>; + async fn shutdown(&mut self) -> RocketMQResult<()>; /// Start scheduling controller events /// @@ -520,11 +520,11 @@ impl Default for MockController { } impl Controller for MockController { - async fn startup(&self) -> RocketMQResult<()> { + async fn startup(&mut self) -> RocketMQResult<()> { Ok(()) } - async fn shutdown(&self) -> RocketMQResult<()> { + async fn shutdown(&mut self) -> RocketMQResult<()> { Ok(()) } @@ -608,7 +608,7 @@ mod tests { #[tokio::test] async fn test_mock_controller_lifecycle() { - let controller = MockController::new(); + let mut controller = MockController::new(); assert!(controller.startup().await.is_ok()); assert!(controller.is_leader()); assert!(controller.shutdown().await.is_ok()); diff --git a/rocketmq-controller/src/controller/controller_manager.rs b/rocketmq-controller/src/controller/controller_manager.rs index 70fed125f..36e8f4884 100644 --- a/rocketmq-controller/src/controller/controller_manager.rs +++ b/rocketmq-controller/src/controller/controller_manager.rs @@ -87,8 +87,8 @@ pub struct ControllerManager { config: Arc, /// Raft controller for consensus and leader election - /// Note: Stored as Arc because ProcessorManager also needs it - raft_controller: Arc, + /// Note: Uses ArcMut to allow mutable access via &self + raft_controller: ArcMut, /// Metadata store for broker and topic information metadata: Arc, @@ -147,7 +147,7 @@ impl ControllerManager { // Initialize Raft controller for leader election // This MUST succeed before proceeding // Using OpenRaft implementation by default - let raft_arc = Arc::new(RaftController::new_open_raft(Arc::clone(&config))); + let raft_arc = ArcMut::new(RaftController::new_open_raft(Arc::clone(&config))); // Initialize metadata store // This MUST succeed before proceeding @@ -373,7 +373,7 @@ impl ControllerManager { info!("Starting controller manager..."); // Start Raft controller first (critical for leader election) - if let Err(e) = self.raft_controller.startup().await { + if let Err(e) = self.raft_controller.mut_from_ref().startup().await { self.running.store(false, Ordering::SeqCst); return Err(ControllerError::Internal(format!( "Failed to start Raft controller: {}", @@ -495,7 +495,7 @@ impl ControllerManager { } // Shutdown Raft controller last (it coordinates distributed operations) - if let Err(e) = self.raft_controller.shutdown().await { + if let Err(e) = self.raft_controller.mut_from_ref().shutdown().await { error!("Failed to shutdown Raft: {}", e); } else { info!("Raft controller shut down"); @@ -617,7 +617,7 @@ impl ControllerManager { self.remoting_client.clone() } - pub fn controller(&self) -> &Arc { + pub fn controller(&self) -> &ArcMut { &self.raft_controller } } diff --git a/rocketmq-controller/src/controller/open_raft_controller.rs b/rocketmq-controller/src/controller/open_raft_controller.rs index 1cc81a9b8..d953fa2ac 100644 --- a/rocketmq-controller/src/controller/open_raft_controller.rs +++ b/rocketmq-controller/src/controller/open_raft_controller.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use cheetah_string::CheetahString; -use parking_lot::Mutex; use rocketmq_common::common::controller::ControllerConfig; use rocketmq_error::RocketMQResult; use rocketmq_remoting::protocol::body::sync_state_set_body::SyncStateSet; @@ -53,36 +52,29 @@ use crate::protobuf::openraft::open_raft_service_server::OpenRaftServiceServer; /// 1. Sends shutdown signal to gRPC server via oneshot channel /// 2. Waits for Raft node to shutdown cleanly /// 3. Waits for gRPC server task to complete (with 10s timeout) -/// -/// # Thread Safety -/// -/// Internal state is protected by `Arc>` to allow `&self` access -/// while maintaining mutability for lifecycle operations. pub struct OpenRaftController { config: Arc, - /// Raft node manager (protected for thread-safe access) - node: Arc>>>, - + /// Raft node manager + node: Option>, /// gRPC server task handle - handle: Arc>>>, - + handle: Option>, /// Shutdown signal sender for gRPC server - shutdown_tx: Arc>>>, + shutdown_tx: Option>, } impl OpenRaftController { pub fn new(config: Arc) -> Self { Self { config, - node: Arc::new(Mutex::new(None)), - handle: Arc::new(Mutex::new(None)), - shutdown_tx: Arc::new(Mutex::new(None)), + node: None, + handle: None, + shutdown_tx: None, } } } impl Controller for OpenRaftController { - async fn startup(&self) -> RocketMQResult<()> { + async fn startup(&mut self) -> RocketMQResult<()> { info!("Starting OpenRaft controller on {}", self.config.listen_addr); let node = Arc::new(RaftNodeManager::new(Arc::clone(&self.config)).await?); @@ -111,45 +103,28 @@ impl Controller for OpenRaftController { } }); - { - let mut node_guard = self.node.lock(); - *node_guard = Some(node); - } - { - let mut handle_guard = self.handle.lock(); - *handle_guard = Some(handle); - } - { - let mut shutdown_tx_guard = self.shutdown_tx.lock(); - *shutdown_tx_guard = Some(shutdown_tx); - } + self.node = Some(node); + self.handle = Some(handle); + self.shutdown_tx = Some(shutdown_tx); info!("OpenRaft controller started successfully"); Ok(()) } - async fn shutdown(&self) -> RocketMQResult<()> { + async fn shutdown(&mut self) -> RocketMQResult<()> { info!("Shutting down OpenRaft controller"); // Take and send shutdown signal to gRPC server - { - let mut shutdown_tx_guard = self.shutdown_tx.lock(); - if let Some(tx) = shutdown_tx_guard.take() { - if tx.send(()).is_err() { - eprintln!("Failed to send shutdown signal to gRPC server (receiver dropped)"); - } else { - info!("Shutdown signal sent to gRPC server"); - } + if let Some(tx) = self.shutdown_tx.take() { + if tx.send(()).is_err() { + eprintln!("Failed to send shutdown signal to gRPC server (receiver dropped)"); + } else { + info!("Shutdown signal sent to gRPC server"); } } // Shutdown Raft node - let node = { - let mut node_guard = self.node.lock(); - node_guard.take() - }; - - if let Some(node) = node { + if let Some(node) = self.node.take() { if let Err(e) = node.shutdown().await { eprintln!("Error shutting down Raft node: {}", e); } else { @@ -158,12 +133,7 @@ impl Controller for OpenRaftController { } // Wait for server task to complete (with timeout) - let handle = { - let mut handle_guard = self.handle.lock(); - handle_guard.take() - }; - - if let Some(handle) = handle { + if let Some(handle) = self.handle.take() { let timeout = tokio::time::Duration::from_secs(10); match tokio::time::timeout(timeout, handle).await { Ok(Ok(_)) => info!("Server task completed successfully"), diff --git a/rocketmq-controller/src/controller/raft_controller.rs b/rocketmq-controller/src/controller/raft_controller.rs index 3496f4f61..1cbd0bef9 100644 --- a/rocketmq-controller/src/controller/raft_controller.rs +++ b/rocketmq-controller/src/controller/raft_controller.rs @@ -59,14 +59,14 @@ impl RaftController { } impl Controller for RaftController { - async fn startup(&self) -> RocketMQResult<()> { + async fn startup(&mut self) -> RocketMQResult<()> { match self { Self::OpenRaft(controller) => controller.startup().await, Self::RaftRs(controller) => controller.startup().await, } } - async fn shutdown(&self) -> RocketMQResult<()> { + async fn shutdown(&mut self) -> RocketMQResult<()> { match self { Self::OpenRaft(controller) => controller.shutdown().await, Self::RaftRs(controller) => controller.shutdown().await, diff --git a/rocketmq-controller/src/controller/raft_rs_controller.rs b/rocketmq-controller/src/controller/raft_rs_controller.rs index 242a6a8aa..fde9d8173 100644 --- a/rocketmq-controller/src/controller/raft_rs_controller.rs +++ b/rocketmq-controller/src/controller/raft_rs_controller.rs @@ -44,12 +44,12 @@ impl RaftRsController { } impl Controller for RaftRsController { - async fn startup(&self) -> RocketMQResult<()> { + async fn startup(&mut self) -> RocketMQResult<()> { // TODO: Initialize raft-rs node Ok(()) } - async fn shutdown(&self) -> RocketMQResult<()> { + async fn shutdown(&mut self) -> RocketMQResult<()> { // TODO: Shutdown raft-rs node Ok(()) } diff --git a/rocketmq-controller/src/processor.rs b/rocketmq-controller/src/processor.rs index f7c0b91ab..2f950de92 100644 --- a/rocketmq-controller/src/processor.rs +++ b/rocketmq-controller/src/processor.rs @@ -65,7 +65,7 @@ pub struct ProcessorManager { config: Arc, /// Raft controller - raft: Arc, + raft: ArcMut, /// Metadata store metadata: Arc, @@ -76,7 +76,7 @@ pub struct ProcessorManager { impl ProcessorManager { /// Create a new processor manager - pub fn new(config: Arc, raft: Arc, metadata: Arc) -> Self { + pub fn new(config: Arc, raft: ArcMut, metadata: Arc) -> Self { // Initialize processors let mut processors: HashMap> = HashMap::new(); diff --git a/rocketmq-controller/src/processor/broker_processor.rs b/rocketmq-controller/src/processor/broker_processor.rs index 3df2777d8..f641a2f5e 100644 --- a/rocketmq-controller/src/processor/broker_processor.rs +++ b/rocketmq-controller/src/processor/broker_processor.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::SystemTime; +use rocketmq_rust::ArcMut; use tracing::debug; use tracing::error; use tracing::info; @@ -41,12 +42,12 @@ pub struct RegisterBrokerProcessor { metadata: Arc, /// Raft controller - raft: Arc, + raft: ArcMut, } impl RegisterBrokerProcessor { /// Create a new register broker processor - pub fn new(metadata: Arc, raft: Arc) -> Self { + pub fn new(metadata: Arc, raft: ArcMut) -> Self { Self { metadata, raft } } @@ -119,12 +120,12 @@ pub struct UnregisterBrokerProcessor { metadata: Arc, /// Raft controller - raft: Arc, + raft: ArcMut, } impl UnregisterBrokerProcessor { /// Create a new unregister broker processor - pub fn new(metadata: Arc, raft: Arc) -> Self { + pub fn new(metadata: Arc, raft: ArcMut) -> Self { Self { metadata, raft } } @@ -226,12 +227,12 @@ pub struct ElectMasterProcessor { metadata: Arc, /// Raft controller - raft: Arc, + raft: ArcMut, } impl ElectMasterProcessor { /// Create a new elect master processor - pub fn new(metadata: Arc, raft: Arc) -> Self { + pub fn new(metadata: Arc, raft: ArcMut) -> Self { Self { metadata, raft } } diff --git a/rocketmq-controller/src/processor/topic_processor.rs b/rocketmq-controller/src/processor/topic_processor.rs index 34d6fd384..ee573a3e5 100644 --- a/rocketmq-controller/src/processor/topic_processor.rs +++ b/rocketmq-controller/src/processor/topic_processor.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use rocketmq_rust::ArcMut; use tracing::error; use tracing::info; @@ -37,12 +38,12 @@ pub struct CreateTopicProcessor { metadata: Arc, /// Raft controller - raft: Arc, + raft: ArcMut, } impl CreateTopicProcessor { /// Create a new create topic processor - pub fn new(metadata: Arc, raft: Arc) -> Self { + pub fn new(metadata: Arc, raft: ArcMut) -> Self { Self { metadata, raft } } @@ -117,12 +118,12 @@ pub struct UpdateTopicProcessor { metadata: Arc, /// Raft controller - raft: Arc, + raft: ArcMut, } impl UpdateTopicProcessor { /// Create a new update topic processor - pub fn new(metadata: Arc, raft: Arc) -> Self { + pub fn new(metadata: Arc, raft: ArcMut) -> Self { Self { metadata, raft } } @@ -188,12 +189,12 @@ pub struct DeleteTopicProcessor { metadata: Arc, /// Raft controller - raft: Arc, + raft: ArcMut, } impl DeleteTopicProcessor { /// Create a new delete topic processor - pub fn new(metadata: Arc, raft: Arc) -> Self { + pub fn new(metadata: Arc, raft: ArcMut) -> Self { Self { metadata, raft } } diff --git a/rocketmq-controller/tests/raft_controller_test.rs b/rocketmq-controller/tests/raft_controller_test.rs index 60a8716ab..919f34757 100644 --- a/rocketmq-controller/tests/raft_controller_test.rs +++ b/rocketmq-controller/tests/raft_controller_test.rs @@ -22,7 +22,7 @@ use rocketmq_runtime::RocketMQRuntime; #[tokio::test] async fn test_open_raft_controller_lifecycle() { let config = Arc::new(ControllerConfig::test_config()); - let controller = RaftController::new_open_raft(config); + let mut controller = RaftController::new_open_raft(config); assert!(controller.startup().await.is_ok()); assert!(!controller.is_leader()); // Default is false @@ -35,7 +35,7 @@ async fn test_raft_rs_controller_lifecycle() { .await .unwrap(); - let controller = RaftController::new_raft_rs(runtime.clone()); + let mut controller = RaftController::new_raft_rs(runtime.clone()); assert!(controller.startup().await.is_ok()); assert!(!controller.is_leader()); // Default is false @@ -57,14 +57,14 @@ async fn test_raft_controller_wrapper() { let config = Arc::new(ControllerConfig::test_config()); // Test OpenRaft variant - let open_raft_controller = RaftController::new_open_raft(config.clone()); + let mut open_raft_controller = RaftController::new_open_raft(config.clone()); assert!(open_raft_controller.startup().await.is_ok()); assert!(!open_raft_controller.is_leader()); assert!(open_raft_controller.shutdown().await.is_ok()); drop(open_raft_controller); // Test RaftRs variant - let raft_rs_controller = RaftController::new_raft_rs(runtime.clone()); + let mut raft_rs_controller = RaftController::new_raft_rs(runtime.clone()); assert!(raft_rs_controller.startup().await.is_ok()); assert!(!raft_rs_controller.is_leader()); assert!(raft_rs_controller.shutdown().await.is_ok());