Skip to content
38 changes: 35 additions & 3 deletions rocketmq-controller/src/processor/controller_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@
use std::collections::HashSet;
use std::sync::Arc;

use crate::controller::broker_heartbeat_manager::BrokerHeartbeatManager;
use crate::heartbeat::default_broker_heartbeat_manager::DefaultBrokerHeartbeatManager;
use crate::manager::ControllerManager;
use crate::Controller;
use rocketmq_error::RocketMQError;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::namesrv::broker_request::BrokerHeartbeatRequestHeader;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::protocol::RemotingDeserializable;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
Expand Down Expand Up @@ -375,11 +378,40 @@ impl ControllerRequestProcessor {
/// Result containing acknowledgment
async fn handle_broker_heartbeat(
&mut self,
_channel: Channel,
channel: Channel,
_ctx: ConnectionHandlerContext,
_request: &mut RemotingCommand,
request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
unimplemented!("unimplemented handle_broker_heartbeat")
let request_header = request.decode_command_custom_header_fast::<BrokerHeartbeatRequestHeader>()?;

if let Some(broker_id) = &request_header.broker_id {
let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.ok_or(RocketMQError::Internal(
"in fn handle_broker_heartbeat, request_header
.heartbeat_timeout_mills is none"
.to_string(),
))? as u64;
Comment on lines +388 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# First, find the BrokerHeartbeatRequestHeader struct definition
echo "=== Searching for BrokerHeartbeatRequestHeader struct ==="
rg -A 10 "struct BrokerHeartbeatRequestHeader" --type rust

echo ""
echo "=== Searching for heartbeat_timeout_mills field ==="
rg "heartbeat_timeout_mills" --type rust -B 2 -A 2

echo ""
echo "=== Checking the actual code at the specified location ==="
sed -n '385,395p' rocketmq-controller/src/processor/controller_request_processor.rs

Repository: mxsm/rocketmq-rust

Length of output: 14983


🏁 Script executed:

#!/bin/bash

# Search more specifically for the field definition
echo "=== Searching for heartbeat_timeout_mills in struct context ==="
ast-grep --pattern 'struct BrokerHeartbeatRequestHeader { $$$ heartbeat_timeout_mills: $_ $$$ }'

Repository: mxsm/rocketmq-rust

Length of output: 125


🏁 Script executed:

#!/bin/bash

# Let's also check what types are commonly used in this codebase for similar fields
echo "=== Looking at request header struct definitions ==="
fd -e rs -x grep -l "RequestHeader" {} \; | head -5 | xargs grep -A 15 "struct.*RequestHeader"

Repository: mxsm/rocketmq-rust

Length of output: 2622


Fix error message formatting and use safer type conversion for heartbeat_timeout_mills.

Two confirmed issues:

  1. Error message formatting: The string literal spans multiple lines with an awkward line break in the middle, harming readability.

  2. Type cast safety: heartbeat_timeout_mills is Option<i64> (signed integer). The as u64 cast silently converts negative values to large unsigned numbers instead of erroring. Use TryFrom or validate non-negative values first.

Prefer u64::try_from() for a safe and idiomatic conversion:

-            let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.ok_or(RocketMQError::Internal(
-                "in fn handle_broker_heartbeat, request_header
-                .heartbeat_timeout_mills is none"
-                    .to_string(),
-            ))? as u64;
+            let heartbeat_timeout_mills = u64::try_from(
+                request_header
+                    .heartbeat_timeout_mills
+                    .ok_or(RocketMQError::Internal(
+                        "in fn handle_broker_heartbeat, request_header.heartbeat_timeout_mills is none".to_string(),
+                    ))?,
+            )
+            .map_err(|_| RocketMQError::Internal("heartbeat_timeout_mills must be non-negative".to_string()))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.ok_or(RocketMQError::Internal(
"in fn handle_broker_heartbeat, request_header
.heartbeat_timeout_mills is none"
.to_string(),
))? as u64;
let heartbeat_timeout_mills = u64::try_from(
request_header
.heartbeat_timeout_mills
.ok_or(RocketMQError::Internal(
"in fn handle_broker_heartbeat, request_header.heartbeat_timeout_mills is none".to_string(),
))?,
)
.map_err(|_| RocketMQError::Internal("heartbeat_timeout_mills must be non-negative".to_string()))?;
🤖 Prompt for AI Agents
In @rocketmq-controller/src/processor/controller_request_processor.rs around
lines 388 - 392, The error message in handle_broker_heartbeat is split across
lines and the cast `as u64` on request_header.heartbeat_timeout_mills
(Option<i64>) unsafely converts negative values; change to unwrap the Option
with a single-line, clear error message and then convert safely using
u64::try_from(value).map_err(...) or explicitly check for value >= 0 and return
a RocketMQError::Internal containing the offending value; reference
request_header.heartbeat_timeout_mills and the handle_broker_heartbeat context
and ensure the resulting heartbeat_timeout_mills is a validated u64.

self.heartbeat_manager.on_broker_heartbeat(
&request_header.cluster_name,
&request_header.broker_name,
&request_header.broker_addr,
*broker_id,
Some(heartbeat_timeout_mills),
channel,
request_header.epoch,
request_header.max_offset,
request_header.confirm_offset,
request_header.election_priority,
);
Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::Success,
"Heart beat success",
)))
} else {
Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty brokerId",
)))
}
}

/// Handle GET_SYNC_STATE_DATA request
Expand Down
Loading