Skip to content

Conversation

@Lori-Shu
Copy link
Contributor

@Lori-Shu Lori-Shu commented Jan 7, 2026

Implementation is pretty easy cuz heartbeat_manager.on_broker_heartbeat has been implemented already.

Fixes #5535

Summary by CodeRabbit

  • New Features

    • Added a dedicated broker heartbeat processing path that decodes heartbeat requests and confirms successful exchanges.
  • Bug Fixes

    • Added explicit timeout handling and stronger heartbeat validation for improved stability.
    • Returns clear error responses when broker identifiers are missing to prevent ambiguous failures and simplify diagnostics.

✏️ Tip: You can customize this high-level summary in your review settings.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@Lori-Shu 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@rocketmq-rust-robot rocketmq-rust-robot added Difficulty level/Hard Hard ISSUE feature🚀 Suggest an idea for this project. labels Jan 7, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 7, 2026

Walkthrough

ControllerRequestProcessor now implements handle_broker_heartbeat: it decodes a BrokerHeartbeatRequestHeader, requires a non-empty brokerId, extracts heartbeat_timeout_millis, calls the BrokerHeartbeatManager with broker metadata and channel/epoch/offsets/election priority, and returns success or ControllerInvalidRequest responses.

Changes

Cohort / File(s) Change Summary
Broker Heartbeat Handler Implementation
rocketmq-controller/src/processor/controller_request_processor.rs
Implemented handle_broker_heartbeat (changed signature to accept Channel, ConnectionHandlerContext, &mut RemotingCommand); decode BrokerHeartbeatRequestHeader via decode_command_custom_header_fast; validate brokerId (return ControllerInvalidRequest if empty); extract heartbeat_timeout_mills (error on missing); invoke heartbeat_manager.on_broker_heartbeat(...) with cluster, brokerName, brokerAddr, brokerId, timeout, channel, epoch, maxOffset, confirmOffset, electionPriority; return success RemotingCommand. Added imports for BrokerHeartbeatManager, RocketMQError, and BrokerHeartbeatRequestHeader.

Sequence Diagram(s)

sequenceDiagram
    participant Broker
    participant Controller as ControllerRequestProcessor
    participant HBMgr as BrokerHeartbeatManager
    participant Channel as NettyChannel

    Broker->>Controller: RemotingCommand (BrokerHeartbeatRequestHeader + payload)
    note right of Controller: decode header\nvalidate brokerId\nextract timeout
    alt brokerId missing
        Controller-->>Broker: RemotingCommand (ControllerInvalidRequest)
    else brokerId present
        Controller->>HBMgr: on_broker_heartbeat(cluster, brokerName, brokerAddr, brokerId,\n  heartbeatTimeoutMillis, Channel, epoch, maxOffset, confirmOffset, electionPriority)
        HBMgr-->>Controller: ack/result
        Controller-->>Broker: RemotingCommand (Success)
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I hopped through code to catch the beat,
broker IDs checked where channels meet,
timeouts plucked and headers read,
heartbeat sent, the manager fed,
controller smiles — the cluster's neat.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title '[ISSUE#5535] Implement BrokerHeartbeat Request Handler' accurately describes the primary change: implementing the broker heartbeat request handler in the controller.
Linked Issues check ✅ Passed The implementation decodes BrokerHeartbeatRequestHeader, validates broker_id presence, calls heartbeat_manager.on_broker_heartbeat with required parameters, and returns appropriate responses, addressing all primary coding objectives from issue #5535.
Out of Scope Changes check ✅ Passed All changes are focused on implementing the handle_broker_heartbeat method signature and logic; no unrelated modifications or out-of-scope functionality is present.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @rocketmq-controller/src/processor/controller_request_processor.rs:
- Around line 386-398: The broker_id check returns a ControllerInvalidRequest
response while heartbeat_timeout_mills uses ok_or with RocketMQError::Internal
and the error string is a copy-paste referencing broker_id; update to use the
same validation style as broker_id: if
request_header.heartbeat_timeout_mills.is_none() return
Ok(Some(RemotingCommand::create_response_command_with_code_remark(ResponseCode::ControllerInvalidRequest,
"Heart beat with empty heartbeatTimeoutMills"))), then safely unwrap
heartbeat_timeout_mills into heartbeat_timeout_mills as u64; if you keep any
ok_or error messages, correct the string to
"request_header.heartbeat_timeout_mills is none".
🧹 Nitpick comments (1)
rocketmq-controller/src/processor/controller_request_processor.rs (1)

399-415: Remove unnecessary return keyword.

The heartbeat manager invocation and response handling are correct. However, the return keyword on line 411 is unnecessary in Rust when returning the final expression from a function.

♻️ Proposed refactor
         self.heartbeat_manager.on_broker_heartbeat(
             &request_header.cluster_name,
             &request_header.broker_name,
             &request_header.broker_addr,
             broker_id,
             Some(heartbeat_timeout_mills),
             ctx.channel().clone(),
             request_header.epoch,
             request_header.max_offset,
             request_header.confirm_offset,
             request_header.election_priority,
         );
-        return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
+        Ok(Some(RemotingCommand::create_response_command_with_code_remark(
             ResponseCode::Success,
             "Heart beat success",
-        )));
+        )))
     }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a6f6cb2 and e550faa.

📒 Files selected for processing (1)
  • rocketmq-controller/src/processor/controller_request_processor.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Check (fmt + clippy)
  • GitHub Check: auto-approve
🔇 Additional comments (2)
rocketmq-controller/src/processor/controller_request_processor.rs (2)

88-88: LGTM! Imports are necessary and correctly added.

The new imports support the broker heartbeat implementation and are all actively used in the handler.

Also applies to: 92-92, 97-97


379-385: LGTM! Function signature and header decoding look good.

The parameter names are correctly updated to reflect their usage, and decode_command_custom_header_fast is an appropriate choice for efficient header parsing.

Comment on lines 386 to 398
if request_header.broker_id.is_none() {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty brokerId",
)));
}
let broker_id = request_header
.broker_id
.ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?;
let heartbeat_timeout_mills = request_header
.heartbeat_timeout_mills
.ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?
as u64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix the copy-paste error in the error message and make error handling consistent.

Two issues found:

  1. Line 397: The error message incorrectly states "request_header.broker_id is none" when it should say "request_header.heartbeat_timeout_mills is none". This is a copy-paste error that would cause confusion during debugging.

  2. Lines 386-398: Inconsistent error handling pattern. The broker_id validation returns a ControllerInvalidRequest response (lines 387-390), while heartbeat_timeout_mills would return a generic RocketMQError::Internal (line 397). Both should use the same error handling approach for consistency.

🐛 Proposed fix for consistent error handling
         let request_header = request.decode_command_custom_header_fast::<BrokerHeartbeatRequestHeader>()?;
         if request_header.broker_id.is_none() {
             return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
                 ResponseCode::ControllerInvalidRequest,
                 "Heart beat with empty brokerId",
             )));
         }
+        if request_header.heartbeat_timeout_mills.is_none() {
+            return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
+                ResponseCode::ControllerInvalidRequest,
+                "Heart beat with empty heartbeat_timeout_mills",
+            )));
+        }
         let broker_id = request_header
             .broker_id
             .ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?;
         let heartbeat_timeout_mills = request_header
             .heartbeat_timeout_mills
-            .ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?
+            .ok_or(RocketMQError::Internal("request_header.heartbeat_timeout_mills is none".to_string()))?
             as u64;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if request_header.broker_id.is_none() {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty brokerId",
)));
}
let broker_id = request_header
.broker_id
.ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?;
let heartbeat_timeout_mills = request_header
.heartbeat_timeout_mills
.ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?
as u64;
if request_header.broker_id.is_none() {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty brokerId",
)));
}
if request_header.heartbeat_timeout_mills.is_none() {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty heartbeat_timeout_mills",
)));
}
let broker_id = request_header
.broker_id
.ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?;
let heartbeat_timeout_mills = request_header
.heartbeat_timeout_mills
.ok_or(RocketMQError::Internal("request_header.heartbeat_timeout_mills is none".to_string()))?
as u64;
🤖 Prompt for AI Agents
In @rocketmq-controller/src/processor/controller_request_processor.rs around
lines 386 - 398, The broker_id check returns a ControllerInvalidRequest response
while heartbeat_timeout_mills uses ok_or with RocketMQError::Internal and the
error string is a copy-paste referencing broker_id; update to use the same
validation style as broker_id: if
request_header.heartbeat_timeout_mills.is_none() return
Ok(Some(RemotingCommand::create_response_command_with_code_remark(ResponseCode::ControllerInvalidRequest,
"Heart beat with empty heartbeatTimeoutMills"))), then safely unwrap
heartbeat_timeout_mills into heartbeat_timeout_mills as u64; if you keep any
ok_or error messages, correct the string to
"request_header.heartbeat_timeout_mills is none".

@rocketmq-rust-bot rocketmq-rust-bot self-requested a review January 7, 2026 12:14
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @rocketmq-controller/src/processor/controller_request_processor.rs:
- Around line 404-407: Remove the unnecessary `return` keywords in the response
branches that currently do `return
Ok(Some(RemotingCommand::create_response_command_with_code_remark(...)));`;
instead make the
`Ok(Some(RemotingCommand::create_response_command_with_code_remark(...)))` the
final expression of the block (no `return` and ensure no trailing semicolon).
Update both occurrences that use
RemotingCommand::create_response_command_with_code_remark with
ResponseCode::Success and the "Heart beat success" remark (the subsequent
similar block at lines ~409-412) so they rely on implicit return.
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e550faa and 753fbde.

📒 Files selected for processing (1)
  • rocketmq-controller/src/processor/controller_request_processor.rs
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-controller/src/processor/controller_request_processor.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (1)
  • controller (620-622)
rocketmq-controller/src/metadata/broker.rs (1)
  • heartbeat (145-158)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
  • request_code (710-712)
  • create_response_command_with_code_remark (187-192)
rocketmq-controller/src/heartbeat/broker_live_info.rs (2)
  • channel (108-110)
  • broker_id (80-82)
🪛 GitHub Actions: RocketMQ Rust CI
rocketmq-controller/src/processor/controller_request_processor.rs

[error] 404-407: unneeded return statement. Clippy: needlessly returning a value. Remove return and use expression directly in Ok(Some(...)).


[error] 409-412: unneeded return statement. Clippy: needlessly returning a value. Remove return and use expression directly in Ok(Some(...)).

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: auto-approve
🔇 Additional comments (2)
rocketmq-controller/src/processor/controller_request_processor.rs (2)

88-88: LGTM! Imports are necessary for broker heartbeat handling.

The added imports support the new heartbeat functionality correctly.

Also applies to: 92-92, 97-97


382-383: LGTM! Parameter names correctly updated.

The underscore prefixes were appropriately removed from ctx and request since they are now used in the implementation.

Comment on lines 387 to 413
if let Some(broker_id) = &request_header.broker_id {
let heartbeat_timeout_mills = request_header
.heartbeat_timeout_mills
.ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?
as u64;
self.heartbeat_manager.on_broker_heartbeat(
&request_header.cluster_name,
&request_header.broker_name,
&request_header.broker_addr,
*broker_id,
Some(heartbeat_timeout_mills),
ctx.channel().clone(),
request_header.epoch,
request_header.max_offset,
request_header.confirm_offset,
request_header.election_priority,
);
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::Success,
"Heart beat success",
)));
} else {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty brokerId",
)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Address the copy-paste error and inconsistent error handling pattern.

The issues previously flagged in the past review remain unresolved:

  1. Line 390: The error message incorrectly states "request_header.broker_id is none" when it should be "request_header.heartbeat_timeout_mills is none".

  2. Inconsistent validation: The broker_id validation uses a controlled error response with ControllerInvalidRequest, while heartbeat_timeout_mills uses ok_or with RocketMQError::Internal. Both should follow the same pattern for consistency.

🐛 Proposed fix for consistent error handling
     let request_header = request.decode_command_custom_header_fast::<BrokerHeartbeatRequestHeader>()?;

-    if let Some(broker_id) = &request_header.broker_id {
-        let heartbeat_timeout_mills = request_header
-            .heartbeat_timeout_mills
-            .ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?
-            as u64;
+    if request_header.broker_id.is_none() {
+        return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
+            ResponseCode::ControllerInvalidRequest,
+            "Heart beat with empty brokerId",
+        )));
+    }
+    if request_header.heartbeat_timeout_mills.is_none() {
+        return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
+            ResponseCode::ControllerInvalidRequest,
+            "Heart beat with empty heartbeat_timeout_mills",
+        )));
+    }
+
+    let broker_id = request_header.broker_id.unwrap();
+    let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.unwrap() as u64;
+
         self.heartbeat_manager.on_broker_heartbeat(
             &request_header.cluster_name,
             &request_header.broker_name,
             &request_header.broker_addr,
-            *broker_id,
+            broker_id,
             Some(heartbeat_timeout_mills),
             ctx.channel().clone(),
             request_header.epoch,
             request_header.max_offset,
             request_header.confirm_offset,
             request_header.election_priority,
         );
-        return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
+        Ok(Some(RemotingCommand::create_response_command_with_code_remark(
             ResponseCode::Success,
             "Heart beat success",
-        )));
-    } else {
-        return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
-            ResponseCode::ControllerInvalidRequest,
-            "Heart beat with empty brokerId",
-        )));
-    }
+        )))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(broker_id) = &request_header.broker_id {
let heartbeat_timeout_mills = request_header
.heartbeat_timeout_mills
.ok_or(RocketMQError::Internal("request_header.broker_id is none".to_string()))?
as u64;
self.heartbeat_manager.on_broker_heartbeat(
&request_header.cluster_name,
&request_header.broker_name,
&request_header.broker_addr,
*broker_id,
Some(heartbeat_timeout_mills),
ctx.channel().clone(),
request_header.epoch,
request_header.max_offset,
request_header.confirm_offset,
request_header.election_priority,
);
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::Success,
"Heart beat success",
)));
} else {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty brokerId",
)));
}
if request_header.broker_id.is_none() {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty brokerId",
)));
}
if request_header.heartbeat_timeout_mills.is_none() {
return Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::ControllerInvalidRequest,
"Heart beat with empty heartbeat_timeout_mills",
)));
}
let broker_id = request_header.broker_id.unwrap();
let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.unwrap() as u64;
self.heartbeat_manager.on_broker_heartbeat(
&request_header.cluster_name,
&request_header.broker_name,
&request_header.broker_addr,
broker_id,
Some(heartbeat_timeout_mills),
ctx.channel().clone(),
request_header.epoch,
request_header.max_offset,
request_header.confirm_offset,
request_header.election_priority,
);
Ok(Some(RemotingCommand::create_response_command_with_code_remark(
ResponseCode::Success,
"Heart beat success",
)))
🧰 Tools
🪛 GitHub Actions: RocketMQ Rust CI

[error] 404-407: unneeded return statement. Clippy: needlessly returning a value. Remove return and use expression directly in Ok(Some(...)).


[error] 409-412: unneeded return statement. Clippy: needlessly returning a value. Remove return and use expression directly in Ok(Some(...)).

@codecov
Copy link

codecov bot commented Jan 7, 2026

Codecov Report

❌ Patch coverage is 0% with 29 lines in your changes missing coverage. Please review.
✅ Project coverage is 38.50%. Comparing base (d0fae41) to head (a203083).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
...ller/src/processor/controller_request_processor.rs 0.00% 29 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5547      +/-   ##
==========================================
- Coverage   38.52%   38.50%   -0.02%     
==========================================
  Files         816      816              
  Lines      110921   110948      +27     
==========================================
- Hits        42728    42726       -2     
- Misses      68193    68222      +29     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - All CI checks passed ✅

&request_header.broker_addr,
*broker_id,
Some(heartbeat_timeout_mills),
ctx.channel().clone(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Channel directly from the method parameters

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @rocketmq-controller/src/processor/controller_request_processor.rs:
- Around line 388-392: The error message in handle_broker_heartbeat is split
across lines and the cast `as u64` on request_header.heartbeat_timeout_mills
(Option<i64>) unsafely converts negative values; change to unwrap the Option
with a single-line, clear error message and then convert safely using
u64::try_from(value).map_err(...) or explicitly check for value >= 0 and return
a RocketMQError::Internal containing the offending value; reference
request_header.heartbeat_timeout_mills and the handle_broker_heartbeat context
and ensure the resulting heartbeat_timeout_mills is a validated u64.
🧹 Nitpick comments (1)
rocketmq-controller/src/processor/controller_request_processor.rs (1)

405-414: Response handling is correct.

The success and error responses are appropriate for the heartbeat handler. The implementation correctly validates broker_id presence before processing.

Minor text consistency suggestion

Consider updating "Heart beat" to "Heartbeat" (one word) for consistency with the method name:

-                "Heart beat success",
+                "Heartbeat success",
-                "Heart beat with empty brokerId",
+                "Heartbeat with empty brokerId",
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b6693e3 and a203083.

📒 Files selected for processing (1)
  • rocketmq-controller/src/processor/controller_request_processor.rs
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-controller/src/processor/controller_request_processor.rs (1)
rocketmq-remoting/src/protocol/remoting_command.rs (1)
  • create_response_command_with_code_remark (187-192)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build & Test (ubuntu-latest)
  • GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-controller/src/processor/controller_request_processor.rs (3)

88-88: LGTM! Imports are necessary for the heartbeat implementation.

The new imports support the handle_broker_heartbeat implementation appropriately.

Also applies to: 92-92, 97-97


379-385: LGTM! Method signature and header decoding are correctly implemented.

The parameter names have been appropriately updated to reflect their usage, and the fast header decoding approach is suitable for this handler.


393-404: No changes needed. The on_broker_heartbeat method is synchronous and correctly called without await.

Likely an incorrect or invalid review comment.

Comment on lines +388 to +392
let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.ok_or(RocketMQError::Internal(
"in fn handle_broker_heartbeat, request_header
.heartbeat_timeout_mills is none"
.to_string(),
))? as u64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# First, find the BrokerHeartbeatRequestHeader struct definition
echo "=== Searching for BrokerHeartbeatRequestHeader struct ==="
rg -A 10 "struct BrokerHeartbeatRequestHeader" --type rust

echo ""
echo "=== Searching for heartbeat_timeout_mills field ==="
rg "heartbeat_timeout_mills" --type rust -B 2 -A 2

echo ""
echo "=== Checking the actual code at the specified location ==="
sed -n '385,395p' rocketmq-controller/src/processor/controller_request_processor.rs

Repository: mxsm/rocketmq-rust

Length of output: 14983


🏁 Script executed:

#!/bin/bash

# Search more specifically for the field definition
echo "=== Searching for heartbeat_timeout_mills in struct context ==="
ast-grep --pattern 'struct BrokerHeartbeatRequestHeader { $$$ heartbeat_timeout_mills: $_ $$$ }'

Repository: mxsm/rocketmq-rust

Length of output: 125


🏁 Script executed:

#!/bin/bash

# Let's also check what types are commonly used in this codebase for similar fields
echo "=== Looking at request header struct definitions ==="
fd -e rs -x grep -l "RequestHeader" {} \; | head -5 | xargs grep -A 15 "struct.*RequestHeader"

Repository: mxsm/rocketmq-rust

Length of output: 2622


Fix error message formatting and use safer type conversion for heartbeat_timeout_mills.

Two confirmed issues:

  1. Error message formatting: The string literal spans multiple lines with an awkward line break in the middle, harming readability.

  2. Type cast safety: heartbeat_timeout_mills is Option<i64> (signed integer). The as u64 cast silently converts negative values to large unsigned numbers instead of erroring. Use TryFrom or validate non-negative values first.

Prefer u64::try_from() for a safe and idiomatic conversion:

-            let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.ok_or(RocketMQError::Internal(
-                "in fn handle_broker_heartbeat, request_header
-                .heartbeat_timeout_mills is none"
-                    .to_string(),
-            ))? as u64;
+            let heartbeat_timeout_mills = u64::try_from(
+                request_header
+                    .heartbeat_timeout_mills
+                    .ok_or(RocketMQError::Internal(
+                        "in fn handle_broker_heartbeat, request_header.heartbeat_timeout_mills is none".to_string(),
+                    ))?,
+            )
+            .map_err(|_| RocketMQError::Internal("heartbeat_timeout_mills must be non-negative".to_string()))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let heartbeat_timeout_mills = request_header.heartbeat_timeout_mills.ok_or(RocketMQError::Internal(
"in fn handle_broker_heartbeat, request_header
.heartbeat_timeout_mills is none"
.to_string(),
))? as u64;
let heartbeat_timeout_mills = u64::try_from(
request_header
.heartbeat_timeout_mills
.ok_or(RocketMQError::Internal(
"in fn handle_broker_heartbeat, request_header.heartbeat_timeout_mills is none".to_string(),
))?,
)
.map_err(|_| RocketMQError::Internal("heartbeat_timeout_mills must be non-negative".to_string()))?;
🤖 Prompt for AI Agents
In @rocketmq-controller/src/processor/controller_request_processor.rs around
lines 388 - 392, The error message in handle_broker_heartbeat is split across
lines and the cast `as u64` on request_header.heartbeat_timeout_mills
(Option<i64>) unsafely converts negative values; change to unwrap the Option
with a single-line, clear error message and then convert safely using
u64::try_from(value).map_err(...) or explicitly check for value >= 0 and return
a RocketMQError::Internal containing the offending value; reference
request_header.heartbeat_timeout_mills and the handle_broker_heartbeat context
and ensure the resulting heartbeat_timeout_mills is a validated u64.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - All CI checks passed ✅

@mxsm mxsm merged commit 9f37d7c into mxsm:main Jan 9, 2026
8 of 14 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Jan 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge Difficulty level/Hard Hard ISSUE feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement BrokerHeartbeat Request Handler

4 participants