Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions rocketmq-common/src/common/controller/controller_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::env;
use std::fs::File;
use std::io::Write;
use std::net::SocketAddr;
use std::path::PathBuf;

Expand Down Expand Up @@ -453,6 +456,147 @@ impl ControllerConfig {

Ok(())
}

/// Update configuration from a map of properties
///
/// # Arguments
///
/// * `properties` - A map of configuration keys to values
///
/// # Returns
///
/// A Result indicating success or failure
pub fn update(&mut self, properties: &HashMap<String, String>) -> Result<(), String> {
for (key, value) in properties {
match key.as_str() {
"rocketmqHome" => self.rocketmq_home = value.clone(),
"controllerType" => self.controller_type = value.clone(),
"scanNotActiveBrokerInterval" => {
self.scan_not_active_broker_interval = value
.parse()
.map_err(|e| format!("Invalid scanNotActiveBrokerInterval: {}", e))?;
}
"controllerThreadPoolNums" => {
self.controller_thread_pool_nums = value
.parse()
.map_err(|e| format!("Invalid controllerThreadPoolNums: {}", e))?;
}
"controllerRequestThreadPoolQueueCapacity" => {
self.controller_request_thread_pool_queue_capacity = value
.parse()
.map_err(|e| format!("Invalid controllerRequestThreadPoolQueueCapacity: {}", e))?;
}
"mappedFileSize" => {
self.mapped_file_size = value.parse().map_err(|e| format!("Invalid mappedFileSize: {}", e))?;
}
"controllerStorePath" => {
self.controller_store_path = value.clone();
}
"electMasterMaxRetryCount" => {
self.elect_master_max_retry_count = value
.parse()
.map_err(|e| format!("Invalid electMasterMaxRetryCount: {}", e))?;
}
"enableElectUncleanMaster" => {
self.enable_elect_unclean_master = value
.parse()
.map_err(|e| format!("Invalid enableElectUncleanMaster: {}", e))?;
}
"isProcessReadEvent" => {
self.is_process_read_event = value
.parse()
.map_err(|e| format!("Invalid isProcessReadEvent: {}", e))?;
}
"notifyBrokerRoleChanged" => {
self.notify_broker_role_changed = value
.parse()
.map_err(|e| format!("Invalid notifyBrokerRoleChanged: {}", e))?;
}
"scanInactiveMasterInterval" => {
self.scan_inactive_master_interval = value
.parse()
.map_err(|e| format!("Invalid scanInactiveMasterInterval: {}", e))?;
}
"metricsExporterType" => {
let exporter_type: MetricsExporterType = value
.parse()
.map_err(|_| format!("Invalid metricsExporterType: {}", value))?;

self.metrics_exporter_type = exporter_type;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"metricsGrpcExporterTarget" => self.metrics_grpc_exporter_target = value.clone(),
"metricsGrpcExporterHeader" => self.metrics_grpc_exporter_header = value.clone(),
"metricGrpcExporterTimeOutInMills" => {
self.metric_grpc_exporter_time_out_in_mills = value
.parse()
.map_err(|e| format!("Invalid metricGrpcExporterTimeOutInMills: {}", e))?;
}
"metricGrpcExporterIntervalInMills" => {
self.metric_grpc_exporter_interval_in_mills = value
.parse()
.map_err(|e| format!("Invalid metricGrpcExporterIntervalInMills: {}", e))?;
}
"metricLoggingExporterIntervalInMills" => {
self.metric_logging_exporter_interval_in_mills = value
.parse()
.map_err(|e| format!("Invalid metricLoggingExporterIntervalInMills: {}", e))?;
}
"metricsPromExporterPort" => {
self.metrics_prom_exporter_port = value
.parse()
.map_err(|e| format!("Invalid metricsPromExporterPort: {}", e))?;
}
"metricsPromExporterHost" => self.metrics_prom_exporter_host = value.clone(),
"metricsLabel" => self.metrics_label = value.clone(),
"metricsInDelta" => {
self.metrics_in_delta = value.parse().map_err(|e| format!("Invalid metricsInDelta: {}", e))?;
}
"configBlackList" => self.config_black_list = value.clone(),
"electionTimeoutMs" => {
self.election_timeout_ms =
value.parse().map_err(|e| format!("Invalid electionTimeoutMs: {}", e))?;
}
"heartbeatIntervalMs" => {
self.heartbeat_interval_ms = value
.parse()
.map_err(|e| format!("Invalid heartbeatIntervalMs: {}", e))?;
}
"storagePath" => self.storage_path = value.clone(),
"enableElectUncleanMasterLocal" => {
self.enable_elect_unclean_master_local = value
.parse()
.map_err(|e| format!("Invalid enableElectUncleanMasterLocal: {}", e))?;
}
_ => return Err(format!("Unknown configuration key: {}", key)),
}
}

// Validate the updated configuration
self.validate()
}
Comment on lines +460 to +576
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add missing configuration fields to the update method.

The update() method is missing several configuration fields that are present in the struct:

  • nodeId
  • listenAddr
  • raftPeers
  • configStorePath
  • storageBackend

These fields should be updateable if they're not in the blacklist, or explicitly excluded with a comment explaining why.

📝 Suggested additions

Add these cases to the match statement before the default case (line 570):

                "enableElectUncleanMasterLocal" => {
                    self.enable_elect_unclean_master_local = value
                        .parse()
                        .map_err(|e| format!("Invalid enableElectUncleanMasterLocal: {}", e))?;
                }
+               "nodeId" => {
+                   self.node_id = value.parse().map_err(|e| format!("Invalid nodeId: {}", e))?;
+               }
+               "listenAddr" => {
+                   self.listen_addr = value.parse().map_err(|e| format!("Invalid listenAddr: {}", e))?;
+               }
+               "configStorePath" => {
+                   self.config_store_path = PathBuf::from(value);
+               }
+               "storageBackend" => {
+                   self.storage_backend = match value.as_str() {
+                       "RocksDB" => StorageBackendType::RocksDB,
+                       "File" => StorageBackendType::File,
+                       "Memory" => StorageBackendType::Memory,
+                       _ => return Err(format!("Invalid storageBackend: {}", value)),
+                   };
+               }
                _ => return Err(format!("Unknown configuration key: {}", key)),

Note: raftPeers (Vec) requires more complex parsing and may be better handled separately or marked as non-updatable.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: RocketMQ Rust CI

[error] 521-521: Cargo fmt --check failed: formatting issue detected. Run 'cargo fmt' to fix formatting (diff shows removed/added blank line around exporter's assignment).

🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 460
- 576, The update() method in controller_config.rs omits several struct fields
(nodeId, listenAddr, raftPeers, configStorePath, storageBackend), so add match
arms in the update() function to set self.node_id, self.listen_addr,
self.config_store_path, and self.storage_backend from the incoming properties
(parsing types as needed), and decide how to handle raftPeers (either
parse/deserialize into Vec<RaftPeer> or explicitly skip it with a clear
comment); ensure unknown-key handling remains the same and call self.validate()
afterward.


/// Persist the configuration to disk
///
/// # Returns
///
/// A Result indicating success or failure
pub fn persist(&self) -> Result<(), String> {
// Ensure the directory exists
if let Some(parent) = self.config_store_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| format!("Failed to create config directory: {}", e))?;
}

// Serialize the configuration to JSON
let json = serde_json::to_string_pretty(self).map_err(|e| format!("Failed to serialize config: {}", e))?;

// Write to file
let mut file =
File::create(&self.config_store_path).map_err(|e| format!("Failed to create config file: {}", e))?;
file.write_all(json.as_bytes())
.map_err(|e| format!("Failed to write config file: {}", e))?;

Ok(())
}
Comment on lines +578 to +599
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's look at the structure of controller_config.rs
wc -l rocketmq-common/src/common/controller/controller_config.rs

Repository: mxsm/rocketmq-rust

Length of output: 124


🏁 Script executed:

#!/bin/bash
# Search for load/read methods in the controller_config.rs file
rg -n "fn.*load|fn.*from|fn.*read" rocketmq-common/src/common/controller/controller_config.rs -A 10

Repository: mxsm/rocketmq-rust

Length of output: 856


🏁 Script executed:

#!/bin/bash
# Search for any mentions of "properties" or format specifications
rg -n "properties|format.*string" rocketmq-common/src/common/controller/controller_config.rs -i

Repository: mxsm/rocketmq-rust

Length of output: 1196


🏁 Script executed:

#!/bin/bash
# Look at imports to understand what serialization is being used
head -50 rocketmq-common/src/common/controller/controller_config.rs

Repository: mxsm/rocketmq-rust

Length of output: 1565


🏁 Script executed:

#!/bin/bash
# Look for any methods that load or deserialize config
rg -n "fn.*load|fn.*from_str|fn.*from_file|impl.*From|fn new" rocketmq-common/src/common/controller/controller_config.rs -A 8 | head -80

Repository: mxsm/rocketmq-rust

Length of output: 650


🏁 Script executed:

#!/bin/bash
# Check the actual persist method and see if there's a corresponding load method
sed -n '500,650p' rocketmq-common/src/common/controller/controller_config.rs

Repository: mxsm/rocketmq-rust

Length of output: 6618


🏁 Script executed:

#!/bin/bash
# Look for any deserialization or from_json patterns
rg -n "from_json|from|load_config|serde_json::from" rocketmq-common/src/common/controller/controller_config.rs -B 3 -A 10

Repository: mxsm/rocketmq-rust

Length of output: 2159


Add a corresponding deserialization method to load persisted JSON configuration.

The persist() method writes configuration as JSON, but there is no corresponding method to load the configuration back from the persisted JSON file. The only configuration loading mechanism is the update() method which expects a HashMap<String, String> (properties format). This creates a one-way serialization where configuration can be persisted but not restored. Additionally, the default file path is named controller.properties (line 223) which misleadingly suggests properties format when JSON is actually written.

Either:

  1. Add a load() or from_file() method using serde_json::from_str to deserialize the persisted JSON, or
  2. Change persist() to write properties format matching the update() mechanism, and rename the file to reflect the actual format.

}

#[cfg(test)]
Expand Down Expand Up @@ -527,4 +671,55 @@ mod tests {
assert_eq!(config.metrics_prom_exporter_port, 9090);
assert_eq!(config.metrics_label, "instance_id:test,uid:123");
}

#[test]
fn test_config_update() {
let mut config = ControllerConfig::default();

let mut properties = HashMap::new();
properties.insert("scanNotActiveBrokerInterval".to_string(), "10000".to_string());
properties.insert("controllerThreadPoolNums".to_string(), "32".to_string());
properties.insert("enableElectUncleanMaster".to_string(), "true".to_string());

let result = config.update(&properties);
assert!(result.is_ok());
assert_eq!(config.scan_not_active_broker_interval, 10000);
assert_eq!(config.controller_thread_pool_nums, 32);
assert!(config.enable_elect_unclean_master);
}

#[test]
fn test_config_update_invalid() {
let mut config = ControllerConfig::default();

let mut properties = HashMap::new();
properties.insert("scanNotActiveBrokerInterval".to_string(), "invalid".to_string());

let result = config.update(&properties);
assert!(result.is_err());
}

#[test]
fn test_config_update_unknown_key() {
let mut config = ControllerConfig::default();

let mut properties = HashMap::new();
properties.insert("unknownKey".to_string(), "value".to_string());

let result = config.update(&properties);
assert!(result.is_err());
}

#[test]
fn test_config_persist() {
let config = ControllerConfig {
config_store_path: PathBuf::from("/tmp/test_controller_config.json"),
..Default::default()
};

let result = config.persist();
assert!(result.is_ok());

std::fs::remove_file("/tmp/test_controller_config.json").ok();
}
Comment on lines +713 to +724
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix Clippy warning: use struct initialization instead of field reassignment.

The test has a field assignment after Default::default(). Use struct initialization syntax to avoid the Clippy warning.

♻️ Proposed fix
     #[test]
     fn test_config_persist() {
-        let mut config = ControllerConfig::default();
-        config.config_store_path = PathBuf::from("/tmp/test_controller_config.json");
+        let config = ControllerConfig {
+            config_store_path: PathBuf::from("/tmp/test_controller_config.json"),
+            ..Default::default()
+        };

         let result = config.persist();
         assert!(result.is_ok());

         std::fs::remove_file("/tmp/test_controller_config.json").ok();
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[test]
fn test_config_persist() {
let mut config = ControllerConfig::default();
config.config_store_path = PathBuf::from("/tmp/test_controller_config.json");
let result = config.persist();
assert!(result.is_ok());
std::fs::remove_file("/tmp/test_controller_config.json").ok();
}
#[test]
fn test_config_persist() {
let config = ControllerConfig {
config_store_path: PathBuf::from("/tmp/test_controller_config.json"),
..Default::default()
};
let result = config.persist();
assert!(result.is_ok());
std::fs::remove_file("/tmp/test_controller_config.json").ok();
}
🧰 Tools
🪛 GitHub Actions: RocketMQ Rust CI

[error] 714-714: field assignment outside of initializer for an instance created with Default::default(). Consider initializing with ControllerConfig { config_store_path: PathBuf::from("/tmp/test_controller_config.json"), ..Default::default() } to avoid reassignments. Clippy warns about field_reassign_with_default.

🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 711
- 720, The test test_config_persist currently creates a mutable ControllerConfig
via Default::default() then reassigns config.config_store_path, which triggers a
Clippy warning; replace that pattern by constructing the value with struct
initialization using the update syntax (e.g. build a ControllerConfig with
config_store_path set to PathBuf::from("/tmp/test_controller_config.json") and
..Default::default()), remove the needless mutability, and keep the rest of the
test (calling persist and removing the file) unchanged.

✅ Addressed in commit 7c4edd5

}
160 changes: 158 additions & 2 deletions rocketmq-controller/src/processor/controller_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,94 @@ impl ControllerRequestProcessor {
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request: &mut RemotingCommand,
request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
unimplemented!("unimplemented handle_update_controller_config")
use rocketmq_error::RocketMQError;
use rocketmq_remoting::code::response_code::ResponseCode;

// Parse request body as properties map
let properties = if let Some(body) = request.body() {
self.parse_properties_from_string(body).map_err(|e| {
RocketMQError::broker_operation_failed(
"UPDATE_CONTROLLER_CONFIG",
ResponseCode::ControllerInvalidRequest.to_i32(),
format!("Failed to parse config properties: {:?}", e),
)
})?
} else {
return Err(RocketMQError::broker_operation_failed(
"UPDATE_CONTROLLER_CONFIG",
ResponseCode::ControllerInvalidRequest.to_i32(),
"Request body is empty".to_string(),
));
};

// Validate blacklist configs
if self.validate_blacklist_config_exist(&properties) {
let error_msg = "Cannot update blacklisted configurations".to_string();
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::NoPermission,
error_msg,
)));
}

// Check if this controller is the leader
if !self.controller_manager.controller().is_leader() {
let error_msg = "Only leader controller can handle configuration updates".to_string();
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerNotLeader,
error_msg,
)));
}

let mut controller_config = self.controller_manager.controller_config().clone();

// Update configuration
match controller_config.update(&properties) {
Ok(_) => {
// Persist the updated configuration
if let Err(e) = controller_config.persist() {
let error_msg = format!("Failed to persist configuration: {}", e);
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
error_msg,
)));
}
Ok(Some(RemotingCommand::create_response_command()))
}
Err(e) => {
let error_msg = format!("Failed to update configuration: {}", e);
Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
error_msg,
)))
}
}
Comment on lines +464 to +486
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Updated configuration is not applied to the running controller.

Line 464 clones the configuration, and the updates are made to this clone. The clone is persisted to disk, but the updated configuration is never applied back to controller_manager. This means:

  1. The configuration changes are saved to disk
  2. BUT the running controller continues using the old configuration
  3. Changes only take effect after a restart

This defeats the purpose of runtime configuration updates mentioned in the PR objectives.

🔧 Proposed fix

After successful persistence, apply the updated configuration back to the controller manager:

        // Update configuration
        match controller_config.update(&properties) {
            Ok(_) => {
                // Persist the updated configuration
                if let Err(e) = controller_config.persist() {
                    let error_msg = format!("Failed to persist configuration: {}", e);
                    return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
                        ResponseCode::SystemError,
                        error_msg,
                    )));
                }
+               
+               // Apply the updated configuration to the controller manager
+               // Note: This assumes controller_manager has a method to update its config
+               // The exact API depends on ControllerManager implementation
+               self.controller_manager.update_controller_config(controller_config);
+               
                Ok(Some(RemotingCommand::create_response_command()))
            }

Alternative: If ControllerManager doesn't support runtime config updates, the controller needs to be designed to reload configuration or accept individual property updates through specific methods.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In @rocketmq-controller/src/processor/controller_request_processor.rs around
lines 464 - 486, You clone and update controller_config but never apply it to
the running controller, so persistence only updates disk; after a successful
persist call, pass the updated controller_config back into the controller
manager (e.g. call a method such as
self.controller_manager.set_controller_config(controller_config) or
self.controller_manager.apply_config(controller_config)); if such a setter does
not exist, implement a method on ControllerManager (e.g.
update_controller_config or reload_config) that atomically replaces the
in-memory config and notifies/refreshes components that depend on it, then
invoke that method immediately after controller_config.persist() succeeds.

}

/// Helper function to parse properties from string format
///
/// # Arguments
///
/// * `body` - Request body containing properties in "key=value" format
///
/// # Returns
///
/// Result containing parsed properties map
fn parse_properties_from_string(&self, body: &[u8]) -> RocketMQResult<std::collections::HashMap<String, String>> {
let content = String::from_utf8_lossy(body);
let mut properties = std::collections::HashMap::new();

for line in content.lines() {
if let Some((key, value)) = line.split_once('=') {
let trimmed_key = key.trim();
if !trimmed_key.is_empty() {
properties.insert(trimmed_key.to_string(), value.trim().to_string());
}
}
}

Ok(properties)
}

/// Handle GET_CONTROLLER_CONFIG request
Expand Down Expand Up @@ -573,6 +658,8 @@ impl RequestProcessor for ControllerRequestProcessor {
#[cfg(test)]
mod tests {
use super::*;
use rocketmq_common::common::controller::ControllerConfig;
use std::collections::HashMap;

#[test]
fn test_config_blacklist() {
Expand All @@ -587,4 +674,73 @@ mod tests {
assert!(blacklist.contains("configStorePath"));
assert!(blacklist.contains("rocketmqHome"));
}

#[test]
fn test_parse_properties_from_string() {
// Test parse_properties_from_string method
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let controller_manager = rt.block_on(async {
crate::controller::controller_manager::ControllerManager::new(ControllerConfig::default())
.await
.unwrap()
});
let processor = ControllerRequestProcessor::new(rocketmq_rust::ArcMut::new(controller_manager));

// Test valid properties
let body =
"scanNotActiveBrokerInterval=10000\ncontrollerThreadPoolNums=32\nenableElectUncleanMaster=true".as_bytes();
let properties = processor.parse_properties_from_string(body).unwrap();

assert_eq!(
properties.get("scanNotActiveBrokerInterval"),
Some(&"10000".to_string())
);
assert_eq!(properties.get("controllerThreadPoolNums"), Some(&"32".to_string()));
assert_eq!(properties.get("enableElectUncleanMaster"), Some(&"true".to_string()));

// Test empty body
let empty_body = "".as_bytes();
let empty_properties = processor.parse_properties_from_string(empty_body).unwrap();
assert!(empty_properties.is_empty());

// Test body with whitespace
let body_with_whitespace =
" scanNotActiveBrokerInterval = 10000 \n controllerThreadPoolNums = 32 ".as_bytes();
let properties_with_whitespace = processor.parse_properties_from_string(body_with_whitespace).unwrap();

assert_eq!(
properties_with_whitespace.get("scanNotActiveBrokerInterval"),
Some(&"10000".to_string())
);
assert_eq!(
properties_with_whitespace.get("controllerThreadPoolNums"),
Some(&"32".to_string())
);
}

#[test]
fn test_validate_blacklist_config_exist() {
// Test validate_blacklist_config_exist method
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let controller_manager = rt.block_on(async {
crate::controller::controller_manager::ControllerManager::new(ControllerConfig::default())
.await
.unwrap()
});
let processor = ControllerRequestProcessor::new(rocketmq_rust::ArcMut::new(controller_manager));

// Test with blacklisted config
let mut properties_with_blacklist = HashMap::new();
properties_with_blacklist.insert("configBlackList".to_string(), "newBlacklist".to_string());
assert!(processor.validate_blacklist_config_exist(&properties_with_blacklist));

// Test with non-blacklisted config
let mut properties_without_blacklist = HashMap::new();
properties_without_blacklist.insert("scanNotActiveBrokerInterval".to_string(), "10000".to_string());
assert!(!processor.validate_blacklist_config_exist(&properties_without_blacklist));

// Test with empty properties
let empty_properties = HashMap::new();
assert!(!processor.validate_blacklist_config_exist(&empty_properties));
}
}
Loading