-
Notifications
You must be signed in to change notification settings - Fork 218
[ISSUE #5528]🚀Implement graceful shutdown and thread-safe state management for OpenRaft controller #5529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ement for OpenRaft controller
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughImplemented graceful shutdown and thread-safe state management for OpenRaft controller. Added Arc-wrapped lifecycle components (node, handle, shutdown_tx). Expanded startup to initialize RaftNodeManager and spawn gRPC server task with shutdown channel. Enhanced shutdown to signal server, shut down Raft node, and await task completion with timeout. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Controller as OpenRaftController
participant NodeMgr as RaftNodeManager
participant Server as gRPC Server
autonumber
rect rgb(230, 245, 255)
Note over Caller,Server: Startup Flow
Caller->>Controller: start()
Controller->>NodeMgr: Create RaftNodeManager
Controller->>Server: Create gRPC service
Controller->>Server: Spawn serve_with_shutdown task
Server-->>Controller: Return JoinHandle
Controller->>Controller: Store node, handle, shutdown_tx<br/>(Arc<Mutex>)
Controller-->>Caller: Startup complete
end
rect rgb(255, 240, 240)
Note over Caller,Server: Shutdown Flow
Caller->>Controller: stop()
Controller->>Server: Signal via oneshot channel
Server-->>Server: Graceful shutdown initiated
Controller->>NodeMgr: Shutdown Raft node
NodeMgr-->>Controller: Node shutdown complete
Controller->>Server: Await JoinHandle (10s timeout)
Server-->>Controller: Server task finished
Controller-->>Caller: Shutdown complete
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @rocketmq-controller/src/controller/open_raft_controller.rs:
- Around line 146-173: The shutdown sequence currently shuts down the Raft node
via self.node.lock() and node.shutdown().await before awaiting the server task
stored in self.handle, which can abort in-flight gRPC requests; change the order
to first take and signal/await the server task (take self.handle via
self.handle.lock(), await the handle with tokio::time::timeout and log results),
and only after the server task completes (or times out) then take self.node via
self.node.lock() and call node.shutdown().await; ensure you preserve the
existing logging paths (info/eprintln) and handle Option semantics for both the
handle and node.
🧹 Nitpick comments (2)
rocketmq-controller/src/controller/open_raft_controller.rs (2)
61-71: Considertokio::sync::Mutexfor consistency in async context.Using
parking_lot::Mutexis acceptable here since locks are held briefly without crossing await points. However, if future changes require holding locks across async operations,tokio::sync::Mutexwould be safer. The current implementation works correctly as-is.
96-112: Usetracing::error!instead ofeprintln!for consistent logging.The spawned task uses
eprintln!for error output (line 108), which bypasses structured logging. This inconsistency makes log aggregation and filtering harder.♻️ Proposed fix
+use tracing::error; + let handle = tokio::spawn(async move { info!("gRPC server starting for node {} on {}", node_id, addr); let result = Server::builder() .add_service(OpenRaftServiceServer::new(service)) .serve_with_shutdown(addr, async { shutdown_rx.await.ok(); info!("Shutdown signal received for node {}, stopping gRPC server", node_id); }) .await; if let Err(e) = result { - eprintln!("gRPC server error for node {}: {}", node_id, e); + error!("gRPC server error for node {}: {}", node_id, e); } else { info!("gRPC server for node {} stopped gracefully", node_id); } });
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-controller/src/controller/open_raft_controller.rs
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-controller/src/controller/open_raft_controller.rs (2)
rocketmq-controller/src/controller.rs (4)
startup(183-183)startup(523-525)shutdown(200-200)shutdown(527-529)rocketmq-controller/src/controller/raft_controller.rs (2)
startup(62-67)shutdown(69-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build & Test (macos-latest)
- GitHub Check: Build & Test (ubuntu-latest)
- GitHub Check: Code Coverage
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-controller/src/controller/open_raft_controller.rs (3)
15-21: LGTM!Clear module-level documentation explaining the key features and design goals.
73-82: LGTM!Constructor properly initializes all lifecycle fields to
None, ready for population duringstartup().
179-259: Stub methods acknowledged.These methods are appropriately marked with TODO comments. The PR scope is focused on graceful shutdown and thread-safe state management, which is correctly implemented in
startup()andshutdown().
| // Shutdown Raft node | ||
| let node = { | ||
| let mut node_guard = self.node.lock(); | ||
| node_guard.take() | ||
| }; | ||
|
|
||
| if let Some(node) = node { | ||
| if let Err(e) = node.shutdown().await { | ||
| eprintln!("Error shutting down Raft node: {}", e); | ||
| } else { | ||
| info!("Raft node shutdown successfully"); | ||
| } | ||
| } | ||
|
|
||
| // Wait for server task to complete (with timeout) | ||
| let handle = { | ||
| let mut handle_guard = self.handle.lock(); | ||
| handle_guard.take() | ||
| }; | ||
|
|
||
| if let Some(handle) = handle { | ||
| let timeout = tokio::time::Duration::from_secs(10); | ||
| match tokio::time::timeout(timeout, handle).await { | ||
| Ok(Ok(_)) => info!("Server task completed successfully"), | ||
| Ok(Err(e)) => eprintln!("Server task panicked: {}", e), | ||
| Err(_) => eprintln!("Timeout waiting for server task to complete"), | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown ordering may cause in-flight requests to fail.
The current order shuts down the Raft node (lines 152-158) before waiting for the gRPC server task to complete (lines 166-173). If the server is still processing in-flight requests when the Raft node shuts down, those requests will fail.
Consider reordering: signal server → wait for server to finish → then shutdown Raft node.
🔧 Proposed fix
// Take and send shutdown signal to gRPC server
{
let mut shutdown_tx_guard = self.shutdown_tx.lock();
if let Some(tx) = shutdown_tx_guard.take() {
if tx.send(()).is_err() {
- eprintln!("Failed to send shutdown signal to gRPC server (receiver dropped)");
+ error!("Failed to send shutdown signal to gRPC server (receiver dropped)");
} else {
info!("Shutdown signal sent to gRPC server");
}
}
}
- // Shutdown Raft node
- let node = {
- let mut node_guard = self.node.lock();
- node_guard.take()
- };
-
- if let Some(node) = node {
- if let Err(e) = node.shutdown().await {
- eprintln!("Error shutting down Raft node: {}", e);
- } else {
- info!("Raft node shutdown successfully");
- }
- }
-
// Wait for server task to complete (with timeout)
let handle = {
let mut handle_guard = self.handle.lock();
handle_guard.take()
};
if let Some(handle) = handle {
let timeout = tokio::time::Duration::from_secs(10);
match tokio::time::timeout(timeout, handle).await {
Ok(Ok(_)) => info!("Server task completed successfully"),
- Ok(Err(e)) => eprintln!("Server task panicked: {}", e),
- Err(_) => eprintln!("Timeout waiting for server task to complete"),
+ Ok(Err(e)) => error!("Server task panicked: {}", e),
+ Err(_) => warn!("Timeout waiting for server task to complete"),
}
}
+ // Shutdown Raft node after server has stopped accepting requests
+ let node = {
+ let mut node_guard = self.node.lock();
+ node_guard.take()
+ };
+
+ if let Some(node) = node {
+ if let Err(e) = node.shutdown().await {
+ error!("Error shutting down Raft node: {}", e);
+ } else {
+ info!("Raft node shutdown successfully");
+ }
+ }
+
info!("OpenRaft controller shutdown completed");
Ok(())Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @rocketmq-controller/src/controller/open_raft_controller.rs around lines 146
- 173, The shutdown sequence currently shuts down the Raft node via
self.node.lock() and node.shutdown().await before awaiting the server task
stored in self.handle, which can abort in-flight gRPC requests; change the order
to first take and signal/await the server task (take self.handle via
self.handle.lock(), await the handle with tokio::time::timeout and log results),
and only after the server task completes (or times out) then take self.node via
self.node.lock() and call node.shutdown().await; ensure you preserve the
existing logging paths (info/eprintln) and handle Option semantics for both the
handle and node.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5529 +/- ##
==========================================
+ Coverage 38.41% 38.44% +0.03%
==========================================
Files 815 815
Lines 110512 110567 +55
==========================================
+ Hits 42452 42508 +56
+ Misses 68060 68059 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Fixes #5528
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.