diff --git a/rocketmq-common/src/common/controller/controller_config.rs b/rocketmq-common/src/common/controller/controller_config.rs index c42cdb43a..29e90633a 100644 --- a/rocketmq-common/src/common/controller/controller_config.rs +++ b/rocketmq-common/src/common/controller/controller_config.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::env; +use std::fmt; use std::net::SocketAddr; use std::path::PathBuf; @@ -53,6 +54,16 @@ pub enum StorageBackendType { Memory, } +impl fmt::Display for StorageBackendType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::RocksDB => write!(f, "rocks_db"), + Self::File => write!(f, "file"), + Self::Memory => write!(f, "memory"), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ControllerConfig { @@ -453,6 +464,102 @@ impl ControllerConfig { Ok(()) } + + /// Convert configuration to properties string format + /// + /// Returns a string in properties format (key=value\n) using camelCase naming + pub fn to_properties_string(&self) -> String { + use std::fmt::Write; + + let mut result = String::with_capacity(2048); + + writeln!(result, "rocketmqHome={}", self.rocketmq_home).unwrap(); + writeln!(result, "configStorePath={}", self.config_store_path.display()).unwrap(); + writeln!(result, "controllerType={}", self.controller_type).unwrap(); + writeln!( + result, + "scanNotActiveBrokerInterval={}", + self.scan_not_active_broker_interval + ) + .unwrap(); + writeln!(result, "controllerThreadPoolNums={}", self.controller_thread_pool_nums).unwrap(); + writeln!( + result, + "controllerRequestThreadPoolQueueCapacity={}", + self.controller_request_thread_pool_queue_capacity + ) + .unwrap(); + writeln!(result, "mappedFileSize={}", self.mapped_file_size).unwrap(); + writeln!(result, "controllerStorePath={}", self.controller_store_path).unwrap(); + writeln!(result, "electMasterMaxRetryCount={}", self.elect_master_max_retry_count).unwrap(); + writeln!(result, "enableElectUncleanMaster={}", self.enable_elect_unclean_master).unwrap(); + writeln!(result, "isProcessReadEvent={}", self.is_process_read_event).unwrap(); + writeln!(result, "notifyBrokerRoleChanged={}", self.notify_broker_role_changed).unwrap(); + writeln!( + result, + "scanInactiveMasterInterval={}", + self.scan_inactive_master_interval + ) + .unwrap(); + writeln!(result, "metricsExporterType={}", self.metrics_exporter_type).unwrap(); + writeln!( + result, + "metricsGrpcExporterTarget={}", + self.metrics_grpc_exporter_target + ) + .unwrap(); + writeln!( + result, + "metricsGrpcExporterHeader={}", + self.metrics_grpc_exporter_header + ) + .unwrap(); + writeln!( + result, + "metricGrpcExporterTimeOutInMills={}", + self.metric_grpc_exporter_time_out_in_mills + ) + .unwrap(); + writeln!( + result, + "metricGrpcExporterIntervalInMills={}", + self.metric_grpc_exporter_interval_in_mills + ) + .unwrap(); + writeln!( + result, + "metricLoggingExporterIntervalInMills={}", + self.metric_logging_exporter_interval_in_mills + ) + .unwrap(); + writeln!(result, "metricsPromExporterPort={}", self.metrics_prom_exporter_port).unwrap(); + writeln!(result, "metricsPromExporterHost={}", self.metrics_prom_exporter_host).unwrap(); + writeln!(result, "metricsLabel={}", self.metrics_label).unwrap(); + writeln!(result, "metricsInDelta={}", self.metrics_in_delta).unwrap(); + writeln!(result, "configBlackList={}", self.config_black_list).unwrap(); + writeln!(result, "nodeId={}", self.node_id).unwrap(); + writeln!(result, "listenAddr={}", self.listen_addr).unwrap(); + + let peers: Vec = self + .raft_peers + .iter() + .map(|peer| format!("{}-{}", peer.id, peer.addr)) + .collect(); + writeln!(result, "raftPeers={}", peers.join(";")).unwrap(); + + writeln!(result, "electionTimeoutMs={}", self.election_timeout_ms).unwrap(); + writeln!(result, "heartbeatIntervalMs={}", self.heartbeat_interval_ms).unwrap(); + writeln!(result, "storagePath={}", self.storage_path).unwrap(); + writeln!(result, "storageBackend={}", self.storage_backend).unwrap(); + writeln!( + result, + "enableElectUncleanMasterLocal={}", + self.enable_elect_unclean_master_local + ) + .unwrap(); + + result + } } #[cfg(test)] @@ -527,4 +634,68 @@ mod tests { assert_eq!(config.metrics_prom_exporter_port, 9090); assert_eq!(config.metrics_label, "instance_id:test,uid:123"); } + + #[test] + fn test_to_properties_string_basic() { + let config = ControllerConfig::default(); + let result = config.to_properties_string(); + + assert!(result.contains("rocketmqHome=")); + assert!(result.contains("controllerType=Raft")); + assert!(result.contains("scanNotActiveBrokerInterval=5000")); + assert!(result.contains("controllerThreadPoolNums=16")); + assert!(result.contains("notifyBrokerRoleChanged=true")); + assert!(result.contains("enableElectUncleanMaster=false")); + } + + #[test] + fn test_to_properties_string_format() { + let config = ControllerConfig::default(); + let result = config.to_properties_string(); + + let lines: Vec<&str> = result.lines().collect(); + assert!(!lines.is_empty()); + + for line in lines { + assert!(line.contains('='), "Each line should contain '=': {}", line); + let parts: Vec<&str> = line.split('=').collect(); + assert_eq!(parts.len(), 2, "Each line should have exactly one '=': {}", line); + } + } + + #[test] + fn test_to_properties_string_with_custom_config() { + let config = ControllerConfig::new() + .with_controller_type(RAFT_CONTROLLER) + .with_scan_not_active_broker_interval(10000) + .with_controller_thread_pool_nums(32) + .with_enable_elect_unclean_master(true) + .with_storage_backend(StorageBackendType::RocksDB); + + let result = config.to_properties_string(); + + assert!(result.contains("controllerType=Raft")); + assert!(result.contains("scanNotActiveBrokerInterval=10000")); + assert!(result.contains("controllerThreadPoolNums=32")); + assert!(result.contains("enableElectUncleanMaster=true")); + assert!(result.contains("storageBackend=rocks_db")); + } + + #[test] + fn test_to_properties_string_with_raft_peers() { + let config = ControllerConfig::new().with_raft_peers(vec![ + RaftPeer { + id: 1, + addr: "127.0.0.1:9877".parse().unwrap(), + }, + RaftPeer { + id: 2, + addr: "127.0.0.1:9878".parse().unwrap(), + }, + ]); + + let result = config.to_properties_string(); + + assert!(result.contains("raftPeers=1-127.0.0.1:9877;2-127.0.0.1:9878")); + } } diff --git a/rocketmq-controller/src/processor/controller_request_processor.rs b/rocketmq-controller/src/processor/controller_request_processor.rs index 8eef35e5d..d196fdafd 100644 --- a/rocketmq-controller/src/processor/controller_request_processor.rs +++ b/rocketmq-controller/src/processor/controller_request_processor.rs @@ -445,7 +445,11 @@ impl ControllerRequestProcessor { _ctx: ConnectionHandlerContext, _request: &mut RemotingCommand, ) -> RocketMQResult> { - unimplemented!("unimplemented handle_get_controller_config") + let controller_config = self.controller_manager.controller_config(); + let config_string = controller_config.to_properties_string(); + + let response = RemotingCommand::create_response_command().set_body(config_string.into_bytes()); + Ok(Some(response)) } /// Handle CLEAN_BROKER_DATA request