diff --git a/rocketmq-controller/src/processor/controller_request_processor.rs b/rocketmq-controller/src/processor/controller_request_processor.rs index 8eef35e5d..3ed605b52 100644 --- a/rocketmq-controller/src/processor/controller_request_processor.rs +++ b/rocketmq-controller/src/processor/controller_request_processor.rs @@ -85,13 +85,16 @@ use std::collections::HashSet; use std::sync::Arc; +use crate::controller::broker_heartbeat_manager::BrokerHeartbeatManager; use crate::heartbeat::default_broker_heartbeat_manager::DefaultBrokerHeartbeatManager; use crate::manager::ControllerManager; use crate::Controller; +use rocketmq_error::RocketMQError; use rocketmq_error::RocketMQResult; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::net::channel::Channel; +use rocketmq_remoting::protocol::header::namesrv::broker_request::BrokerHeartbeatRequestHeader; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::protocol::RemotingDeserializable; use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext; @@ -375,11 +378,40 @@ impl ControllerRequestProcessor { /// Result containing acknowledgment async fn handle_broker_heartbeat( &mut self, - _channel: Channel, + channel: Channel, _ctx: ConnectionHandlerContext, - _request: &mut RemotingCommand, + request: &mut RemotingCommand, ) -> RocketMQResult> { - unimplemented!("unimplemented handle_broker_heartbeat") + let request_header = request.decode_command_custom_header_fast::()?; + + if let Some(broker_id) = &request_header.broker_id { + let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.ok_or(RocketMQError::Internal( + "in fn handle_broker_heartbeat, request_header + .heartbeat_timeout_mills is none" + .to_string(), + ))? as u64; + self.heartbeat_manager.on_broker_heartbeat( + &request_header.cluster_name, + &request_header.broker_name, + &request_header.broker_addr, + *broker_id, + Some(heartbeat_timeout_mills), + channel, + request_header.epoch, + request_header.max_offset, + request_header.confirm_offset, + request_header.election_priority, + ); + Ok(Some(RemotingCommand::create_response_command_with_code_remark( + ResponseCode::Success, + "Heart beat success", + ))) + } else { + Ok(Some(RemotingCommand::create_response_command_with_code_remark( + ResponseCode::ControllerInvalidRequest, + "Heart beat with empty brokerId", + ))) + } } /// Handle GET_SYNC_STATE_DATA request