Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions rocketmq-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ pub trait Controller: Send + Sync {
/// # Example
///
/// ```rust,ignore
/// let controller = RaftController::new(config).await?;
/// let mut controller = RaftController::new(config).await?;
/// controller.startup().await?;
/// ```
async fn startup(&self) -> RocketMQResult<()>;
async fn startup(&mut self) -> RocketMQResult<()>;

/// Shutdown the controller gracefully
///
Expand All @@ -197,7 +197,7 @@ pub trait Controller: Send + Sync {
///
/// Returns error if graceful shutdown fails. The controller may still be
/// in an inconsistent state, requiring external intervention.
async fn shutdown(&self) -> RocketMQResult<()>;
async fn shutdown(&mut self) -> RocketMQResult<()>;

/// Start scheduling controller events
///
Expand Down Expand Up @@ -520,11 +520,11 @@ impl Default for MockController {
}

impl Controller for MockController {
async fn startup(&self) -> RocketMQResult<()> {
async fn startup(&mut self) -> RocketMQResult<()> {
Ok(())
}

async fn shutdown(&self) -> RocketMQResult<()> {
async fn shutdown(&mut self) -> RocketMQResult<()> {
Ok(())
}

Expand Down Expand Up @@ -608,7 +608,7 @@ mod tests {

#[tokio::test]
async fn test_mock_controller_lifecycle() {
let controller = MockController::new();
let mut controller = MockController::new();
assert!(controller.startup().await.is_ok());
assert!(controller.is_leader());
assert!(controller.shutdown().await.is_ok());
Expand Down
12 changes: 6 additions & 6 deletions rocketmq-controller/src/controller/controller_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ pub struct ControllerManager {
config: Arc<ControllerConfig>,

/// Raft controller for consensus and leader election
/// Note: Stored as Arc because ProcessorManager also needs it
raft_controller: Arc<RaftController>,
/// Note: Uses ArcMut to allow mutable access via &self
raft_controller: ArcMut<RaftController>,

/// Metadata store for broker and topic information
metadata: Arc<MetadataStore>,
Expand Down Expand Up @@ -147,7 +147,7 @@ impl ControllerManager {
// Initialize Raft controller for leader election
// This MUST succeed before proceeding
// Using OpenRaft implementation by default
let raft_arc = Arc::new(RaftController::new_open_raft(Arc::clone(&config)));
let raft_arc = ArcMut::new(RaftController::new_open_raft(Arc::clone(&config)));

// Initialize metadata store
// This MUST succeed before proceeding
Expand Down Expand Up @@ -373,7 +373,7 @@ impl ControllerManager {
info!("Starting controller manager...");

// Start Raft controller first (critical for leader election)
if let Err(e) = self.raft_controller.startup().await {
if let Err(e) = self.raft_controller.mut_from_ref().startup().await {
self.running.store(false, Ordering::SeqCst);
return Err(ControllerError::Internal(format!(
"Failed to start Raft controller: {}",
Expand Down Expand Up @@ -495,7 +495,7 @@ impl ControllerManager {
}

// Shutdown Raft controller last (it coordinates distributed operations)
if let Err(e) = self.raft_controller.shutdown().await {
if let Err(e) = self.raft_controller.mut_from_ref().shutdown().await {
error!("Failed to shutdown Raft: {}", e);
} else {
info!("Raft controller shut down");
Expand Down Expand Up @@ -617,7 +617,7 @@ impl ControllerManager {
self.remoting_client.clone()
}

pub fn controller(&self) -> &Arc<RaftController> {
pub fn controller(&self) -> &ArcMut<RaftController> {
&self.raft_controller
}
}
Expand Down
68 changes: 19 additions & 49 deletions rocketmq-controller/src/controller/open_raft_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use std::sync::Arc;

use cheetah_string::CheetahString;
use parking_lot::Mutex;
use rocketmq_common::common::controller::ControllerConfig;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::protocol::body::sync_state_set_body::SyncStateSet;
Expand Down Expand Up @@ -53,36 +52,29 @@ use crate::protobuf::openraft::open_raft_service_server::OpenRaftServiceServer;
/// 1. Sends shutdown signal to gRPC server via oneshot channel
/// 2. Waits for Raft node to shutdown cleanly
/// 3. Waits for gRPC server task to complete (with 10s timeout)
///
/// # Thread Safety
///
/// Internal state is protected by `Arc<Mutex<>>` to allow `&self` access
/// while maintaining mutability for lifecycle operations.
pub struct OpenRaftController {
config: Arc<ControllerConfig>,
/// Raft node manager (protected for thread-safe access)
node: Arc<Mutex<Option<Arc<RaftNodeManager>>>>,

/// Raft node manager
node: Option<Arc<RaftNodeManager>>,
/// gRPC server task handle
handle: Arc<Mutex<Option<JoinHandle<()>>>>,

handle: Option<JoinHandle<()>>,
/// Shutdown signal sender for gRPC server
shutdown_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}

impl OpenRaftController {
pub fn new(config: Arc<ControllerConfig>) -> Self {
Self {
config,
node: Arc::new(Mutex::new(None)),
handle: Arc::new(Mutex::new(None)),
shutdown_tx: Arc::new(Mutex::new(None)),
node: None,
handle: None,
shutdown_tx: None,
}
}
}

impl Controller for OpenRaftController {
async fn startup(&self) -> RocketMQResult<()> {
async fn startup(&mut self) -> RocketMQResult<()> {
info!("Starting OpenRaft controller on {}", self.config.listen_addr);

let node = Arc::new(RaftNodeManager::new(Arc::clone(&self.config)).await?);
Expand Down Expand Up @@ -111,45 +103,28 @@ impl Controller for OpenRaftController {
}
});

{
let mut node_guard = self.node.lock();
*node_guard = Some(node);
}
{
let mut handle_guard = self.handle.lock();
*handle_guard = Some(handle);
}
{
let mut shutdown_tx_guard = self.shutdown_tx.lock();
*shutdown_tx_guard = Some(shutdown_tx);
}
self.node = Some(node);
self.handle = Some(handle);
self.shutdown_tx = Some(shutdown_tx);

info!("OpenRaft controller started successfully");
Ok(())
}

async fn shutdown(&self) -> RocketMQResult<()> {
async fn shutdown(&mut self) -> RocketMQResult<()> {
info!("Shutting down OpenRaft controller");

// Take and send shutdown signal to gRPC server
{
let mut shutdown_tx_guard = self.shutdown_tx.lock();
if let Some(tx) = shutdown_tx_guard.take() {
if tx.send(()).is_err() {
eprintln!("Failed to send shutdown signal to gRPC server (receiver dropped)");
} else {
info!("Shutdown signal sent to gRPC server");
}
if let Some(tx) = self.shutdown_tx.take() {
if tx.send(()).is_err() {
eprintln!("Failed to send shutdown signal to gRPC server (receiver dropped)");
} else {
info!("Shutdown signal sent to gRPC server");
}
}

// Shutdown Raft node
let node = {
let mut node_guard = self.node.lock();
node_guard.take()
};

if let Some(node) = node {
if let Some(node) = self.node.take() {
if let Err(e) = node.shutdown().await {
eprintln!("Error shutting down Raft node: {}", e);
} else {
Expand All @@ -158,12 +133,7 @@ impl Controller for OpenRaftController {
}

// Wait for server task to complete (with timeout)
let handle = {
let mut handle_guard = self.handle.lock();
handle_guard.take()
};

if let Some(handle) = handle {
if let Some(handle) = self.handle.take() {
let timeout = tokio::time::Duration::from_secs(10);
match tokio::time::timeout(timeout, handle).await {
Ok(Ok(_)) => info!("Server task completed successfully"),
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-controller/src/controller/raft_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ impl RaftController {
}

impl Controller for RaftController {
async fn startup(&self) -> RocketMQResult<()> {
async fn startup(&mut self) -> RocketMQResult<()> {
match self {
Self::OpenRaft(controller) => controller.startup().await,
Self::RaftRs(controller) => controller.startup().await,
}
}
Comment on lines +62 to 67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for ArcMut implementation to verify DerefMut trait and mut_from_ref usage patterns
echo "=== Searching for DerefMut implementation for ArcMut ==="
rg -n "impl.*DerefMut.*for.*ArcMut" -A 5

echo ""
echo "=== Searching for mut_from_ref function definition ==="
rg -n "fn mut_from_ref" -A 3

echo ""
echo "=== Searching for mut_from_ref usage in codebase ==="
rg -n "\.mut_from_ref\(\)" -C 2

echo ""
echo "=== Searching for ArcMut definition and trait implementations ==="
rg -n "struct ArcMut|impl.*for ArcMut" -A 3

Repository: mxsm/rocketmq-rust

Length of output: 50374


Consider using mut_from_ref() for consistency with the same codebase pattern.

ArcMut correctly implements DerefMut, so the direct method call controller.startup().await on &mut ArcMut<T> works as intended. However, the same raft_controller field uses explicit mut_from_ref() calls elsewhere in controller_manager.rs (lines 376 and 498):

self.raft_controller.mut_from_ref().startup().await
self.raft_controller.mut_from_ref().shutdown().await

Align this code with the explicit pattern for consistency across the codebase.

🤖 Prompt for AI Agents
In @rocketmq-controller/src/controller/raft_controller.rs around lines 62 - 67,
The startup method should use the same ArcMut access pattern used elsewhere:
instead of calling controller.startup().await directly on the &mut ArcMut<T>
enum branches, call controller.mut_from_ref().startup().await for both
Self::OpenRaft(controller) and Self::RaftRs(controller) so it matches the
existing explicit mut_from_ref() usage (referencing ArcMut, mut_from_ref, and
the startup method on the inner controller).


async fn shutdown(&self) -> RocketMQResult<()> {
async fn shutdown(&mut self) -> RocketMQResult<()> {
match self {
Self::OpenRaft(controller) => controller.shutdown().await,
Self::RaftRs(controller) => controller.shutdown().await,
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-controller/src/controller/raft_rs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ impl RaftRsController {
}

impl Controller for RaftRsController {
async fn startup(&self) -> RocketMQResult<()> {
async fn startup(&mut self) -> RocketMQResult<()> {
// TODO: Initialize raft-rs node
Ok(())
}

async fn shutdown(&self) -> RocketMQResult<()> {
async fn shutdown(&mut self) -> RocketMQResult<()> {
// TODO: Shutdown raft-rs node
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-controller/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub struct ProcessorManager {
config: Arc<ControllerConfig>,

/// Raft controller
raft: Arc<RaftController>,
raft: ArcMut<RaftController>,

/// Metadata store
metadata: Arc<MetadataStore>,
Expand All @@ -76,7 +76,7 @@ pub struct ProcessorManager {

impl ProcessorManager {
/// Create a new processor manager
pub fn new(config: Arc<ControllerConfig>, raft: Arc<RaftController>, metadata: Arc<MetadataStore>) -> Self {
pub fn new(config: Arc<ControllerConfig>, raft: ArcMut<RaftController>, metadata: Arc<MetadataStore>) -> Self {
// Initialize processors
let mut processors: HashMap<RequestType, Arc<dyn RequestProcessor>> = HashMap::new();

Expand Down
13 changes: 7 additions & 6 deletions rocketmq-controller/src/processor/broker_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::SystemTime;

use rocketmq_rust::ArcMut;
use tracing::debug;
use tracing::error;
use tracing::info;
Expand All @@ -41,12 +42,12 @@ pub struct RegisterBrokerProcessor {
metadata: Arc<MetadataStore>,

/// Raft controller
raft: Arc<RaftController>,
raft: ArcMut<RaftController>,
}

impl RegisterBrokerProcessor {
/// Create a new register broker processor
pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
pub fn new(metadata: Arc<MetadataStore>, raft: ArcMut<RaftController>) -> Self {
Self { metadata, raft }
}

Expand Down Expand Up @@ -119,12 +120,12 @@ pub struct UnregisterBrokerProcessor {
metadata: Arc<MetadataStore>,

/// Raft controller
raft: Arc<RaftController>,
raft: ArcMut<RaftController>,
}

impl UnregisterBrokerProcessor {
/// Create a new unregister broker processor
pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
pub fn new(metadata: Arc<MetadataStore>, raft: ArcMut<RaftController>) -> Self {
Self { metadata, raft }
}

Expand Down Expand Up @@ -226,12 +227,12 @@ pub struct ElectMasterProcessor {
metadata: Arc<MetadataStore>,

/// Raft controller
raft: Arc<RaftController>,
raft: ArcMut<RaftController>,
}

impl ElectMasterProcessor {
/// Create a new elect master processor
pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
pub fn new(metadata: Arc<MetadataStore>, raft: ArcMut<RaftController>) -> Self {
Self { metadata, raft }
}

Expand Down
13 changes: 7 additions & 6 deletions rocketmq-controller/src/processor/topic_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use rocketmq_rust::ArcMut;
use tracing::error;
use tracing::info;

Expand All @@ -37,12 +38,12 @@ pub struct CreateTopicProcessor {
metadata: Arc<MetadataStore>,

/// Raft controller
raft: Arc<RaftController>,
raft: ArcMut<RaftController>,
}

impl CreateTopicProcessor {
/// Create a new create topic processor
pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
pub fn new(metadata: Arc<MetadataStore>, raft: ArcMut<RaftController>) -> Self {
Self { metadata, raft }
}

Expand Down Expand Up @@ -117,12 +118,12 @@ pub struct UpdateTopicProcessor {
metadata: Arc<MetadataStore>,

/// Raft controller
raft: Arc<RaftController>,
raft: ArcMut<RaftController>,
}

impl UpdateTopicProcessor {
/// Create a new update topic processor
pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
pub fn new(metadata: Arc<MetadataStore>, raft: ArcMut<RaftController>) -> Self {
Self { metadata, raft }
}

Expand Down Expand Up @@ -188,12 +189,12 @@ pub struct DeleteTopicProcessor {
metadata: Arc<MetadataStore>,

/// Raft controller
raft: Arc<RaftController>,
raft: ArcMut<RaftController>,
}

impl DeleteTopicProcessor {
/// Create a new delete topic processor
pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
pub fn new(metadata: Arc<MetadataStore>, raft: ArcMut<RaftController>) -> Self {
Self { metadata, raft }
}

Expand Down
Loading
Loading