-
Notifications
You must be signed in to change notification settings - Fork 218
[Feature🚀] Implement UpdateControllerConfig Request Handler #5568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
🔊@onenewcode 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
|
Warning Rate limit exceeded@onenewcode has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 0 minutes and 44 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
WalkthroughAdded dynamic configuration update capability to RocketMQ controller: new public methods Changes
Sequence DiagramsequenceDiagram
participant Client
participant Processor as ControllerRequestProcessor
participant Manager as ControllerManager
participant Config as ControllerConfig
participant FileSystem
Client->>Processor: UpdateControllerConfig Request
Processor->>Processor: Parse request body<br/>→ HashMap<String, String>
alt Empty Body
Processor-->>Client: Error: Body is empty
end
Processor->>Processor: Validate blacklist<br/>(check against forbidden keys)
alt Blacklisted Keys Found
Processor-->>Client: Error: NoPermission
end
Processor->>Manager: Check if leader?
Manager-->>Processor: Leader status
alt Not Leader
Processor-->>Client: Error: ControllerNotLeader
end
Processor->>Manager: Get configuration
Manager-->>Processor: ControllerConfig instance
Processor->>Config: update(properties)
Config->>Config: Apply per-key updates<br/>+ validate()
Config-->>Processor: Result (success/error)
alt Update Failed
Processor-->>Client: Error: ControllerInvalidRequest
end
Processor->>Config: persist()
Config->>FileSystem: Create parent directories
Config->>FileSystem: Serialize to JSON<br/>+ write to disk
FileSystem-->>Config: Result (success/error)
alt Persistence Failed
Processor-->>Client: Error: SystemError
end
Processor-->>Client: Success Response
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-controller/src/processor/controller_request_processor.rs (1)
420-487: CRITICAL: Multiple issues with configuration update handler.1. Configuration not applied to runtime (critical)
Line 464 clones the config, updates it, and persists it, but the updated config is never applied back tocontroller_manager. This is the same critical issue flagged in theupdate()method review—the runtime configuration remains unchanged.2. Missing authentication/authorization (major)
The PR objectives (#5545) explicitly require "enforce admin permissions," but there's no authentication or authorization check. Any client can update the controller configuration.3. Missing audit logging (major)
The PR objectives require "audit changes," but there's no logging of who made what changes. Security best practices require audit trails for configuration modifications.4. Missing Raft consensus (major)
The PR objectives mention "cluster consistency (persistence/raft)," but this implementation only writes to the local disk without Raft consensus. For a Raft-based controller, configuration changes should be replicated through the consensus protocol to ensure all nodes have consistent configuration.Recommended fixes:
- Apply the updated config to the controller manager atomically
- Add authentication/authorization checks (verify admin permissions)
- Add structured audit logging with principal, timestamp, and changed properties
- Route config changes through Raft consensus for cluster consistency
🤖 Fix all issues with AI agents
In @rocketmq-common/src/common/controller/controller_config.rs:
- Line 576: Update the doc comment that currently reads "/// Persist the
configuration to dis" to "/// Persist the configuration to disk" by editing the
comment text (search for the exact comment string to locate it).
- Around line 711-720: The test test_config_persist currently creates a mutable
ControllerConfig via Default::default() then reassigns config.config_store_path,
which triggers a Clippy warning; replace that pattern by constructing the value
with struct initialization using the update syntax (e.g. build a
ControllerConfig with config_store_path set to
PathBuf::from("/tmp/test_controller_config.json") and ..Default::default()),
remove the needless mutability, and keep the rest of the test (calling persist
and removing the file) unchanged.
- Around line 460-574: update() currently mutates and persists a cloned
ControllerConfig but ControllerManager exposes config as Arc<ControllerConfig>
(via ControllerManager::controller_config()), so the running controller never
sees the changes; fix by making the in-memory config replaceable and atomically
swapped: either change the stored type to Arc<RwLock<ControllerConfig>> or
Arc<ArcSwap<ControllerConfig>> and add a ControllerManager::update_config(new:
Arc<ControllerConfig>) method that swaps the active config atomically; update
controller_request_processor to call ControllerManager::update_config(...)
instead of modifying a clone and persisting only, and ensure all config readers
access the new locking/swap mechanism; finally, add a doc note to
ControllerConfig::update() clarifying that node_id, listen_addr, raft_peers, and
storage_backend are intentionally not updatable at runtime.
In @rocketmq-controller/src/processor/controller_request_processor.rs:
- Around line 678-741: Remove the duplicated unit tests in
controller_request_processor.rs by deleting the redundant test functions
test_controller_config_update, test_controller_config_update_invalid,
test_controller_config_update_unknown_key, test_controller_config_persist, and
test_controller_config_blacklist (these are duplicates of tests in
ControllerConfig in rocketmq-common). Rely on the existing tests in
ControllerConfig and, separately, add an integration test exercising
handle_update_controller_config to validate the end-to-end application of
configuration to a running controller (verify request handling and that
ControllerConfig fields are updated).
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-common/src/common/controller/controller_config.rsrocketmq-controller/src/processor/controller_request_processor.rs
🧰 Additional context used
🧬 Code graph analysis (2)
rocketmq-common/src/common/controller/controller_config.rs (6)
rocketmq-controller/src/controller/controller_manager.rs (2)
config(575-577)new(139-211)rocketmq-controller/src/processor/controller_request_processor.rs (1)
new(140-149)rocketmq-common/src/common.rs (4)
new(162-164)from(88-94)from(98-100)from(104-110)rocketmq-controller/src/metadata/replica.rs (2)
new(161-173)new(231-237)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(107-113)rocketmq-common/src/common/namesrv/namesrv_config.rs (1)
scan_not_active_broker_interval(78-80)
rocketmq-controller/src/processor/controller_request_processor.rs (4)
rocketmq-error/src/unified.rs (1)
broker_operation_failed(404-411)rocketmq-remoting/src/protocol/remoting_command.rs (2)
create_response_command_with_code_remark(187-192)create_response_command(194-198)rocketmq-controller/src/controller/controller_manager.rs (4)
controller_config(596-598)new(139-211)controller(620-622)config(575-577)rocketmq-common/src/common/controller/controller_config.rs (3)
new(260-262)default(206-255)is_config_in_blacklist(428-430)
🪛 GitHub Actions: RocketMQ Rust CI
rocketmq-common/src/common/controller/controller_config.rs
[error] 714-714: field assignment outside of initializer for an instance created with Default::default(). Consider initializing with ControllerConfig { config_store_path: PathBuf::from("/tmp/test_controller_config.json"), ..Default::default() } to avoid reassignments. Clippy warns about field_reassign_with_default.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: auto-approve
🔇 Additional comments (7)
rocketmq-common/src/common/controller/controller_config.rs (3)
15-18: LGTM!The new imports (HashMap, File, Write) appropriately support the dynamic configuration update and persistence functionality.
581-597: LGTM!The persist implementation correctly handles directory creation, JSON serialization, and file writing with appropriate error handling.
673-687: LGTM!The test coverage for
update()is comprehensive, testing valid updates, invalid values, and unknown keys with appropriate assertions.rocketmq-controller/src/processor/controller_request_processor.rs (4)
455-462: LGTM!The leader check correctly prevents non-leader nodes from handling configuration updates, which is appropriate for a Raft-based controller.
489-512: LGTM!The property parsing logic correctly handles the key=value format with appropriate whitespace handling and empty key filtering.
744-784: LGTM!Excellent test coverage for property parsing, including edge cases like empty body and whitespace handling.
786-810: LGTM!The test thoroughly validates blacklist checking logic with all relevant scenarios.
| #[test] | ||
| fn test_config_persist() { | ||
| let mut config = ControllerConfig::default(); | ||
| config.config_store_path = PathBuf::from("/tmp/test_controller_config.json"); | ||
|
|
||
| let result = config.persist(); | ||
| assert!(result.is_ok()); | ||
|
|
||
| std::fs::remove_file("/tmp/test_controller_config.json").ok(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Clippy warning: use struct initialization instead of field reassignment.
The test has a field assignment after Default::default(). Use struct initialization syntax to avoid the Clippy warning.
♻️ Proposed fix
#[test]
fn test_config_persist() {
- let mut config = ControllerConfig::default();
- config.config_store_path = PathBuf::from("/tmp/test_controller_config.json");
+ let config = ControllerConfig {
+ config_store_path: PathBuf::from("/tmp/test_controller_config.json"),
+ ..Default::default()
+ };
let result = config.persist();
assert!(result.is_ok());
std::fs::remove_file("/tmp/test_controller_config.json").ok();
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[test] | |
| fn test_config_persist() { | |
| let mut config = ControllerConfig::default(); | |
| config.config_store_path = PathBuf::from("/tmp/test_controller_config.json"); | |
| let result = config.persist(); | |
| assert!(result.is_ok()); | |
| std::fs::remove_file("/tmp/test_controller_config.json").ok(); | |
| } | |
| #[test] | |
| fn test_config_persist() { | |
| let config = ControllerConfig { | |
| config_store_path: PathBuf::from("/tmp/test_controller_config.json"), | |
| ..Default::default() | |
| }; | |
| let result = config.persist(); | |
| assert!(result.is_ok()); | |
| std::fs::remove_file("/tmp/test_controller_config.json").ok(); | |
| } |
🧰 Tools
🪛 GitHub Actions: RocketMQ Rust CI
[error] 714-714: field assignment outside of initializer for an instance created with Default::default(). Consider initializing with ControllerConfig { config_store_path: PathBuf::from("/tmp/test_controller_config.json"), ..Default::default() } to avoid reassignments. Clippy warns about field_reassign_with_default.
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 711
- 720, The test test_config_persist currently creates a mutable ControllerConfig
via Default::default() then reassigns config.config_store_path, which triggers a
Clippy warning; replace that pattern by constructing the value with struct
initialization using the update syntax (e.g. build a ControllerConfig with
config_store_path set to PathBuf::from("/tmp/test_controller_config.json") and
..Default::default()), remove the needless mutability, and keep the rest of the
test (calling persist and removing the file) unchanged.
✅ Addressed in commit 7c4edd5
rocketmq-controller/src/processor/controller_request_processor.rs
Outdated
Show resolved
Hide resolved
5a86607 to
7c4edd5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @rocketmq-common/src/common/controller/controller_config.rs:
- Line 576: Fix the typo in the doc comment that currently reads "Persist the
configuration to dis" by changing it to "Persist the configuration to disk" on
the doc comment above the configuration persistence method in
controller_config.rs (the doc comment associated with the configuration persist
function/method).
- Around line 520-524: The parsing for the "metricsExporterType" property uses
serde_json::from_str which expects JSON-quoted input; change it to use
value.parse::<MetricsExporterType>() (leveraging MetricsExporterType's FromStr
impl) and propagate errors with map_err to a clear message (e.g., "Invalid
metricsExporterType: ..."); update the block that assigns
self.metrics_exporter_type accordingly so it matches how other properties are
parsed in this module.
🧹 Nitpick comments (1)
rocketmq-common/src/common/controller/controller_config.rs (1)
711-722: Improve test portability with cross-platform temp directory.The test uses a hardcoded
/tmp/path (line 714), which may not exist on all platforms (e.g., Windows). Usestd::env::temp_dir()for better cross-platform compatibility.♻️ Proposed fix
#[test] fn test_config_persist() { + let temp_path = std::env::temp_dir().join("test_controller_config.json"); let config = ControllerConfig { - config_store_path: PathBuf::from("/tmp/test_controller_config.json"), + config_store_path: temp_path.clone(), ..Default::default() }; let result = config.persist(); assert!(result.is_ok()); - std::fs::remove_file("/tmp/test_controller_config.json").ok(); + std::fs::remove_file(&temp_path).ok(); }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-common/src/common/controller/controller_config.rsrocketmq-controller/src/processor/controller_request_processor.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rocketmq-controller/src/processor/controller_request_processor.rs
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-common/src/common/controller/controller_config.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (2)
config(575-577)new(139-211)rocketmq-controller/src/processor/controller_request_processor.rs (1)
new(140-149)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(107-113)rocketmq-common/src/common/namesrv/namesrv_config.rs (1)
scan_not_active_broker_interval(78-80)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Build & Test (windows-latest)
- GitHub Check: Build & Test (macos-latest)
- GitHub Check: Code Coverage
- GitHub Check: Build & Test (ubuntu-latest)
- GitHub Check: auto-approve
🔇 Additional comments (6)
rocketmq-common/src/common/controller/controller_config.rs (6)
15-15: LGTM!The new imports are appropriate for the update and persist functionality.
Also applies to: 17-18
581-597: LGTM!The persist implementation correctly handles directory creation, serialization, and file writing with appropriate error handling.
673-687: LGTM!The update test validates that valid properties are correctly applied to the configuration.
689-698: LGTM!The test correctly verifies that invalid values trigger parse errors.
700-709: LGTM!The test confirms that unknown configuration keys are properly rejected.
469-574: Verify update method covers all configurable fields.The
updatemethod currently omits handling for theseControllerConfigkeys:
nodeId(u64)listenAddr(SocketAddr)raftPeers(Vec)storageBackend(StorageBackendType)Should these remain immutable at runtime, or must
updatesupport the"nodeId","listenAddr","raftPeers", and"storageBackend"keys?
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5568 +/- ##
==========================================
+ Coverage 38.43% 38.60% +0.16%
==========================================
Files 815 816 +1
Lines 110551 111146 +595
==========================================
+ Hits 42492 42908 +416
- Misses 68059 68238 +179 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - All CI checks passed ✅
7c4edd5 to
40aa02f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In @rocketmq-common/src/common/controller/controller_config.rs:
- Around line 520-526: The CI formatting error is due to an extra blank line in
the match arm that parses metricsExporterType; edit the block that uses
MetricsExporterType and self.metrics_exporter_type to remove the stray blank
line so the arm reads compactly (keep the parse/map_err flow and assignment
unchanged), then run cargo fmt to apply consistent formatting across
controller_config.rs.
- Around line 460-576: The update() method in controller_config.rs omits several
struct fields (nodeId, listenAddr, raftPeers, configStorePath, storageBackend),
so add match arms in the update() function to set self.node_id,
self.listen_addr, self.config_store_path, and self.storage_backend from the
incoming properties (parsing types as needed), and decide how to handle
raftPeers (either parse/deserialize into Vec<RaftPeer> or explicitly skip it
with a clear comment); ensure unknown-key handling remains the same and call
self.validate() afterward.
In @rocketmq-controller/src/processor/controller_request_processor.rs:
- Around line 674-678: CI found extra blank lines around the assertions; run
rustfmt (cargo fmt) to reformat the file and remove the extra blank lines around
the block that contains assert!(blacklist.contains("configStorePath")) and
assert!(blacklist.contains("rocketmqHome")) so the closing brace and assertions
are properly formatted.
- Around line 464-486: You clone and update controller_config but never apply it
to the running controller, so persistence only updates disk; after a successful
persist call, pass the updated controller_config back into the controller
manager (e.g. call a method such as
self.controller_manager.set_controller_config(controller_config) or
self.controller_manager.apply_config(controller_config)); if such a setter does
not exist, implement a method on ControllerManager (e.g.
update_controller_config or reload_config) that atomically replaces the
in-memory config and notifies/refreshes components that depend on it, then
invoke that method immediately after controller_config.persist() succeeds.
🧹 Nitpick comments (3)
rocketmq-common/src/common/controller/controller_config.rs (1)
713-724: Use tempfile for cross-platform test compatibility.The test uses a hardcoded
/tmppath which will fail on Windows. Use thetempfilecrate (already available per library context) for cross-platform temporary file handling.🔧 Suggested fix
#[test] fn test_config_persist() { + use tempfile::NamedTempFile; + + let temp_file = NamedTempFile::new().unwrap(); let config = ControllerConfig { - config_store_path: PathBuf::from("/tmp/test_controller_config.json"), + config_store_path: temp_file.path().to_path_buf(), ..Default::default() }; let result = config.persist(); assert!(result.is_ok()); - - std::fs::remove_file("/tmp/test_controller_config.json").ok(); + // tempfile automatically cleans up on drop }rocketmq-controller/src/processor/controller_request_processor.rs (2)
489-512: Consider making this a static function.The
parse_properties_from_stringmethod takes&selfbut doesn't use any instance data. It could be a static associated function for clarity.♻️ Optional refactor
- fn parse_properties_from_string(&self, body: &[u8]) -> RocketMQResult<std::collections::HashMap<String, String>> { + fn parse_properties_from_string(body: &[u8]) -> RocketMQResult<std::collections::HashMap<String, String>> { let content = String::from_utf8_lossy(body); let mut properties = std::collections::HashMap::new(); for line in content.lines() { if let Some((key, value)) = line.split_once('=') { let trimmed_key = key.trim(); if !trimmed_key.is_empty() { properties.insert(trimmed_key.to_string(), value.trim().to_string()); } } } Ok(properties) }Then update the call site at line 431:
- self.parse_properties_from_string(body).map_err(|e| { + Self::parse_properties_from_string(body).map_err(|e| {
679-746: Consider adding integration test for handle_update_controller_config.The current tests cover individual helper methods well, but there's no test for the complete
handle_update_controller_configflow including:
- End-to-end update with valid properties
- Blacklist rejection
- Non-leader rejection
- Persistence failures
This would provide better confidence that the entire request handling path works correctly.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-common/src/common/controller/controller_config.rsrocketmq-controller/src/processor/controller_request_processor.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-12-01T10:26:22.233Z
Learnt from: PanGan21
Repo: mxsm/rocketmq-rust PR: 1485
File: rocketmq-common/src/common/namesrv/namesrv_config.rs:151-245
Timestamp: 2024-12-01T10:26:22.233Z
Learning: In `NamesrvConfig`, replacing `serde(alias = "...")` with `serde(rename = "...")` could cause breaking changes, so manual serialization is preferred to avoid disrupting existing functionality.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
🧬 Code graph analysis (2)
rocketmq-controller/src/processor/controller_request_processor.rs (2)
rocketmq-error/src/unified.rs (1)
broker_operation_failed(404-411)rocketmq-remoting/src/protocol/remoting_command.rs (2)
create_response_command_with_code_remark(187-192)create_response_command(194-198)
rocketmq-common/src/common/controller/controller_config.rs (1)
rocketmq-common/src/common/namesrv/namesrv_config.rs (1)
scan_not_active_broker_interval(78-80)
🪛 GitHub Actions: RocketMQ Rust CI
rocketmq-controller/src/processor/controller_request_processor.rs
[error] 675-675: Cargo fmt --check failed: formatting issue detected in test region. Run 'cargo fmt' to fix formatting (inferred from diff around #[test]).
rocketmq-common/src/common/controller/controller_config.rs
[error] 521-521: Cargo fmt --check failed: formatting issue detected. Run 'cargo fmt' to fix formatting (diff shows removed/added blank line around exporter's assignment).
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-common/src/common/controller/controller_config.rs (1)
15-18: LGTM!The new imports are correctly added to support the
update()andpersist()methods.rocketmq-controller/src/processor/controller_request_processor.rs (2)
661-663: LGTM!Test imports are correctly added to support the new test cases.
429-486: LGTM on error handling!The error handling is comprehensive and uses appropriate response codes for each failure scenario:
- Empty/invalid body →
ControllerInvalidRequest- Blacklist violations →
NoPermission- Non-leader →
ControllerNotLeader- Persistence failures →
SystemError- Update failures →
ControllerInvalidRequestEach error includes descriptive messages for troubleshooting.
| /// Update configuration from a map of properties | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `properties` - A map of configuration keys to values | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// A Result indicating success or failure | ||
| pub fn update(&mut self, properties: &HashMap<String, String>) -> Result<(), String> { | ||
| for (key, value) in properties { | ||
| match key.as_str() { | ||
| "rocketmqHome" => self.rocketmq_home = value.clone(), | ||
| "controllerType" => self.controller_type = value.clone(), | ||
| "scanNotActiveBrokerInterval" => { | ||
| self.scan_not_active_broker_interval = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid scanNotActiveBrokerInterval: {}", e))?; | ||
| } | ||
| "controllerThreadPoolNums" => { | ||
| self.controller_thread_pool_nums = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid controllerThreadPoolNums: {}", e))?; | ||
| } | ||
| "controllerRequestThreadPoolQueueCapacity" => { | ||
| self.controller_request_thread_pool_queue_capacity = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid controllerRequestThreadPoolQueueCapacity: {}", e))?; | ||
| } | ||
| "mappedFileSize" => { | ||
| self.mapped_file_size = value.parse().map_err(|e| format!("Invalid mappedFileSize: {}", e))?; | ||
| } | ||
| "controllerStorePath" => { | ||
| self.controller_store_path = value.clone(); | ||
| } | ||
| "electMasterMaxRetryCount" => { | ||
| self.elect_master_max_retry_count = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid electMasterMaxRetryCount: {}", e))?; | ||
| } | ||
| "enableElectUncleanMaster" => { | ||
| self.enable_elect_unclean_master = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid enableElectUncleanMaster: {}", e))?; | ||
| } | ||
| "isProcessReadEvent" => { | ||
| self.is_process_read_event = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid isProcessReadEvent: {}", e))?; | ||
| } | ||
| "notifyBrokerRoleChanged" => { | ||
| self.notify_broker_role_changed = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid notifyBrokerRoleChanged: {}", e))?; | ||
| } | ||
| "scanInactiveMasterInterval" => { | ||
| self.scan_inactive_master_interval = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid scanInactiveMasterInterval: {}", e))?; | ||
| } | ||
| "metricsExporterType" => { | ||
| let exporter_type: MetricsExporterType = value | ||
| .parse() | ||
| .map_err(|_| format!("Invalid metricsExporterType: {}", value))?; | ||
|
|
||
| self.metrics_exporter_type = exporter_type; | ||
| } | ||
| "metricsGrpcExporterTarget" => self.metrics_grpc_exporter_target = value.clone(), | ||
| "metricsGrpcExporterHeader" => self.metrics_grpc_exporter_header = value.clone(), | ||
| "metricGrpcExporterTimeOutInMills" => { | ||
| self.metric_grpc_exporter_time_out_in_mills = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid metricGrpcExporterTimeOutInMills: {}", e))?; | ||
| } | ||
| "metricGrpcExporterIntervalInMills" => { | ||
| self.metric_grpc_exporter_interval_in_mills = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid metricGrpcExporterIntervalInMills: {}", e))?; | ||
| } | ||
| "metricLoggingExporterIntervalInMills" => { | ||
| self.metric_logging_exporter_interval_in_mills = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid metricLoggingExporterIntervalInMills: {}", e))?; | ||
| } | ||
| "metricsPromExporterPort" => { | ||
| self.metrics_prom_exporter_port = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid metricsPromExporterPort: {}", e))?; | ||
| } | ||
| "metricsPromExporterHost" => self.metrics_prom_exporter_host = value.clone(), | ||
| "metricsLabel" => self.metrics_label = value.clone(), | ||
| "metricsInDelta" => { | ||
| self.metrics_in_delta = value.parse().map_err(|e| format!("Invalid metricsInDelta: {}", e))?; | ||
| } | ||
| "configBlackList" => self.config_black_list = value.clone(), | ||
| "electionTimeoutMs" => { | ||
| self.election_timeout_ms = | ||
| value.parse().map_err(|e| format!("Invalid electionTimeoutMs: {}", e))?; | ||
| } | ||
| "heartbeatIntervalMs" => { | ||
| self.heartbeat_interval_ms = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid heartbeatIntervalMs: {}", e))?; | ||
| } | ||
| "storagePath" => self.storage_path = value.clone(), | ||
| "enableElectUncleanMasterLocal" => { | ||
| self.enable_elect_unclean_master_local = value | ||
| .parse() | ||
| .map_err(|e| format!("Invalid enableElectUncleanMasterLocal: {}", e))?; | ||
| } | ||
| _ => return Err(format!("Unknown configuration key: {}", key)), | ||
| } | ||
| } | ||
|
|
||
| // Validate the updated configuration | ||
| self.validate() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add missing configuration fields to the update method.
The update() method is missing several configuration fields that are present in the struct:
nodeIdlistenAddrraftPeersconfigStorePathstorageBackend
These fields should be updateable if they're not in the blacklist, or explicitly excluded with a comment explaining why.
📝 Suggested additions
Add these cases to the match statement before the default case (line 570):
"enableElectUncleanMasterLocal" => {
self.enable_elect_unclean_master_local = value
.parse()
.map_err(|e| format!("Invalid enableElectUncleanMasterLocal: {}", e))?;
}
+ "nodeId" => {
+ self.node_id = value.parse().map_err(|e| format!("Invalid nodeId: {}", e))?;
+ }
+ "listenAddr" => {
+ self.listen_addr = value.parse().map_err(|e| format!("Invalid listenAddr: {}", e))?;
+ }
+ "configStorePath" => {
+ self.config_store_path = PathBuf::from(value);
+ }
+ "storageBackend" => {
+ self.storage_backend = match value.as_str() {
+ "RocksDB" => StorageBackendType::RocksDB,
+ "File" => StorageBackendType::File,
+ "Memory" => StorageBackendType::Memory,
+ _ => return Err(format!("Invalid storageBackend: {}", value)),
+ };
+ }
_ => return Err(format!("Unknown configuration key: {}", key)),Note: raftPeers (Vec) requires more complex parsing and may be better handled separately or marked as non-updatable.
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 GitHub Actions: RocketMQ Rust CI
[error] 521-521: Cargo fmt --check failed: formatting issue detected. Run 'cargo fmt' to fix formatting (diff shows removed/added blank line around exporter's assignment).
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 460
- 576, The update() method in controller_config.rs omits several struct fields
(nodeId, listenAddr, raftPeers, configStorePath, storageBackend), so add match
arms in the update() function to set self.node_id, self.listen_addr,
self.config_store_path, and self.storage_backend from the incoming properties
(parsing types as needed), and decide how to handle raftPeers (either
parse/deserialize into Vec<RaftPeer> or explicitly skip it with a clear
comment); ensure unknown-key handling remains the same and call self.validate()
afterward.
| /// Persist the configuration to disk | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// A Result indicating success or failure | ||
| pub fn persist(&self) -> Result<(), String> { | ||
| // Ensure the directory exists | ||
| if let Some(parent) = self.config_store_path.parent() { | ||
| std::fs::create_dir_all(parent).map_err(|e| format!("Failed to create config directory: {}", e))?; | ||
| } | ||
|
|
||
| // Serialize the configuration to JSON | ||
| let json = serde_json::to_string_pretty(self).map_err(|e| format!("Failed to serialize config: {}", e))?; | ||
|
|
||
| // Write to file | ||
| let mut file = | ||
| File::create(&self.config_store_path).map_err(|e| format!("Failed to create config file: {}", e))?; | ||
| file.write_all(json.as_bytes()) | ||
| .map_err(|e| format!("Failed to write config file: {}", e))?; | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's look at the structure of controller_config.rs
wc -l rocketmq-common/src/common/controller/controller_config.rsRepository: mxsm/rocketmq-rust
Length of output: 124
🏁 Script executed:
#!/bin/bash
# Search for load/read methods in the controller_config.rs file
rg -n "fn.*load|fn.*from|fn.*read" rocketmq-common/src/common/controller/controller_config.rs -A 10Repository: mxsm/rocketmq-rust
Length of output: 856
🏁 Script executed:
#!/bin/bash
# Search for any mentions of "properties" or format specifications
rg -n "properties|format.*string" rocketmq-common/src/common/controller/controller_config.rs -iRepository: mxsm/rocketmq-rust
Length of output: 1196
🏁 Script executed:
#!/bin/bash
# Look at imports to understand what serialization is being used
head -50 rocketmq-common/src/common/controller/controller_config.rsRepository: mxsm/rocketmq-rust
Length of output: 1565
🏁 Script executed:
#!/bin/bash
# Look for any methods that load or deserialize config
rg -n "fn.*load|fn.*from_str|fn.*from_file|impl.*From|fn new" rocketmq-common/src/common/controller/controller_config.rs -A 8 | head -80Repository: mxsm/rocketmq-rust
Length of output: 650
🏁 Script executed:
#!/bin/bash
# Check the actual persist method and see if there's a corresponding load method
sed -n '500,650p' rocketmq-common/src/common/controller/controller_config.rsRepository: mxsm/rocketmq-rust
Length of output: 6618
🏁 Script executed:
#!/bin/bash
# Look for any deserialization or from_json patterns
rg -n "from_json|from|load_config|serde_json::from" rocketmq-common/src/common/controller/controller_config.rs -B 3 -A 10Repository: mxsm/rocketmq-rust
Length of output: 2159
Add a corresponding deserialization method to load persisted JSON configuration.
The persist() method writes configuration as JSON, but there is no corresponding method to load the configuration back from the persisted JSON file. The only configuration loading mechanism is the update() method which expects a HashMap<String, String> (properties format). This creates a one-way serialization where configuration can be persisted but not restored. Additionally, the default file path is named controller.properties (line 223) which misleadingly suggests properties format when JSON is actually written.
Either:
- Add a
load()orfrom_file()method usingserde_json::from_strto deserialize the persisted JSON, or - Change
persist()to write properties format matching theupdate()mechanism, and rename the file to reflect the actual format.
| let mut controller_config = self.controller_manager.controller_config().clone(); | ||
|
|
||
| // Update configuration | ||
| match controller_config.update(&properties) { | ||
| Ok(_) => { | ||
| // Persist the updated configuration | ||
| if let Err(e) = controller_config.persist() { | ||
| let error_msg = format!("Failed to persist configuration: {}", e); | ||
| return Ok(Some(RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::SystemError, | ||
| error_msg, | ||
| ))); | ||
| } | ||
| Ok(Some(RemotingCommand::create_response_command())) | ||
| } | ||
| Err(e) => { | ||
| let error_msg = format!("Failed to update configuration: {}", e); | ||
| Ok(Some(RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::ControllerInvalidRequest, | ||
| error_msg, | ||
| ))) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Updated configuration is not applied to the running controller.
Line 464 clones the configuration, and the updates are made to this clone. The clone is persisted to disk, but the updated configuration is never applied back to controller_manager. This means:
- The configuration changes are saved to disk
- BUT the running controller continues using the old configuration
- Changes only take effect after a restart
This defeats the purpose of runtime configuration updates mentioned in the PR objectives.
🔧 Proposed fix
After successful persistence, apply the updated configuration back to the controller manager:
// Update configuration
match controller_config.update(&properties) {
Ok(_) => {
// Persist the updated configuration
if let Err(e) = controller_config.persist() {
let error_msg = format!("Failed to persist configuration: {}", e);
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
error_msg,
)));
}
+
+ // Apply the updated configuration to the controller manager
+ // Note: This assumes controller_manager has a method to update its config
+ // The exact API depends on ControllerManager implementation
+ self.controller_manager.update_controller_config(controller_config);
+
Ok(Some(RemotingCommand::create_response_command()))
}Alternative: If ControllerManager doesn't support runtime config updates, the controller needs to be designed to reload configuration or accept individual property updates through specific methods.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @rocketmq-controller/src/processor/controller_request_processor.rs around
lines 464 - 486, You clone and update controller_config but never apply it to
the running controller, so persistence only updates disk; after a successful
persist call, pass the updated controller_config back into the controller
manager (e.g. call a method such as
self.controller_manager.set_controller_config(controller_config) or
self.controller_manager.apply_config(controller_config)); if such a setter does
not exist, implement a method on ControllerManager (e.g.
update_controller_config or reload_config) that atomically replaces the
in-memory config and notifies/refreshes components that depend on it, then
invoke that method immediately after controller_config.persist() succeeds.
rocketmq-controller/src/processor/controller_request_processor.rs
Outdated
Show resolved
Hide resolved
e077fb7 to
67d2879
Compare
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
[Feature🚀] Implement UpdateControllerConfig Request Handler Single Node
How Did You Test This Change?
add test cases
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.