diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 4514f94f9..5a477033c 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -60,7 +60,7 @@ use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::StorageIOResult; use crate::error::Timeout; -use crate::impls::OneshotResponder; +use crate::impls::ProgressResponder; use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt; use crate::metrics::HeartbeatMetrics; use crate::metrics::RaftDataMetrics; @@ -463,19 +463,19 @@ where &mut self, changes: ChangeMembers, retain: bool, - tx: OneshotResponder>, + tx: ProgressResponder>, ) { let res = self.engine.state.membership_state.change_handler().apply(changes, retain); let new_membership = match res { Ok(x) => x, Err(e) => { - tx.send(Err(ClientWriteError::ChangeMembershipError(e))); + tx.on_complete(Err(ClientWriteError::ChangeMembershipError(e))); return; } }; let ent = C::Entry::new_membership(LogIdOf::::default(), new_membership); - self.write_entry(ent, Some(CoreResponder::Oneshot(tx))); + self.write_entry(ent, Some(CoreResponder::Progress(tx))); } /// Write a log entry to the cluster through raft protocol. @@ -487,7 +487,7 @@ where /// The calling side may not receive a result from `resp_tx`, if raft is shut down. /// /// The responder `resp_tx` is either Responder type of - /// [`RaftTypeConfig::Responder`] (application-defined) or [`OneshotResponder`] + /// [`RaftTypeConfig::Responder`] (application-defined) or [`ProgressResponder`] /// (general-purpose); the former is for application-defined entries like user data, the /// latter is for membership configuration changes. #[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))] @@ -502,7 +502,7 @@ where if let Some(to) = lh.leader.get_transfer_to() { if let Some(tx) = tx { let err = lh.state.new_forward_to_leader(to.clone()); - tx.send(Err(ClientWriteError::ForwardToLeader(err))); + tx.on_complete(Err(ClientWriteError::ForwardToLeader(err))); } return; } @@ -818,6 +818,11 @@ where let entry_count = last.index() + 1 - first.index(); self.runtime_stats.apply_batch.record(entry_count); + for (index, responder) in responders.iter_mut() { + let log_id = self.engine.state.get_log_id(*index).unwrap(); + responder.on_commit(log_id); + } + let cmd = sm::Command::apply(first, last.clone(), responders); self.sm_handle.send(cmd).map_err(|e| StorageError::apply(last, AnyError::error(e)))?; @@ -1829,7 +1834,7 @@ where #[allow(clippy::let_underscore_future)] let _ = C::spawn(async move { for (log_index, tx) in removed.into_iter() { - tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader { + tx.on_complete(Err(ClientWriteError::ForwardToLeader(ForwardToLeader { leader_id: leader_id.clone(), leader_node: leader_node.clone(), }))); diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index c657ded00..12d3c916f 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -10,7 +10,7 @@ use crate::display_ext::DisplayBTreeMapDebugValueExt; use crate::error::CheckIsLeaderError; use crate::error::Infallible; use crate::error::InitializeError; -use crate::impls::OneshotResponder; +use crate::impls::ProgressResponder; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::ClientWriteResult; @@ -93,7 +93,7 @@ where C: RaftTypeConfig /// config will be converted into learners, otherwise they will be removed. retain: bool, - tx: OneshotResponder>, + tx: ProgressResponder>, }, ExternalCoreRequest { diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index d56781b80..059bfc6ad 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -262,7 +262,7 @@ where C: RaftTypeConfig tx: Option, ) -> Option<(LeaderHandler<'_, C>, Option)> where - R: Responder>, + R: Responder>, { let res = self.leader_handler(); let forward_err = match res { @@ -274,7 +274,7 @@ where C: RaftTypeConfig }; if let Some(tx) = tx { - tx.send(Err(forward_err.into())); + tx.on_complete(Err(forward_err.into())); } None diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs index 508bd5fda..5a3553dd8 100644 --- a/openraft/src/impls/mod.rs +++ b/openraft/src/impls/mod.rs @@ -27,6 +27,7 @@ pub use crate::entry::Entry; pub use crate::node::BasicNode; pub use crate::node::EmptyNode; pub use crate::raft::responder::impls::OneshotResponder; +pub use crate::raft::responder::impls::ProgressResponder; #[cfg(feature = "tokio-rt")] pub use crate::type_config::async_runtime::tokio_impls::TokioRuntime; diff --git a/openraft/src/raft/api/app.rs b/openraft/src/raft/api/app.rs index 098c07571..4280a10d0 100644 --- a/openraft/src/raft/api/app.rs +++ b/openraft/src/raft/api/app.rs @@ -6,7 +6,7 @@ use crate::core::raft_msg::RaftMsg; use crate::error::CheckIsLeaderError; use crate::error::ClientWriteError; use crate::error::Fatal; -use crate::impls::OneshotResponder; +use crate::impls::ProgressResponder; use crate::raft::ClientWriteResponse; use crate::raft::ClientWriteResult; use crate::raft::linearizable_read::Linearizer; @@ -50,12 +50,11 @@ where C: RaftTypeConfig app_data: C::D, // TODO: ClientWriteError can only be ForwardToLeader Error ) -> Result, ClientWriteError>, Fatal> { - let (tx, rx) = C::oneshot(); - let responder = OneshotResponder::new(tx); + let (responder, _commit_rx, complete_rx) = ProgressResponder::new(); - self.do_client_write_ff(app_data, Some(CoreResponder::Oneshot(responder))).await?; + self.do_client_write_ff(app_data, Some(CoreResponder::Progress(responder))).await?; - let res: ClientWriteResult = self.inner.recv_msg(rx).await?; + let res: ClientWriteResult = self.inner.recv_msg(complete_rx).await?; Ok(res) } diff --git a/openraft/src/raft/api/management.rs b/openraft/src/raft/api/management.rs index f6726fe83..a72feeb00 100644 --- a/openraft/src/raft/api/management.rs +++ b/openraft/src/raft/api/management.rs @@ -14,7 +14,7 @@ use crate::display_ext::DisplayResult; use crate::display_ext::DisplayResultExt; use crate::error::Fatal; use crate::error::InitializeError; -use crate::impls::OneshotResponder; +use crate::impls::ProgressResponder; use crate::membership::IntoNodes; use crate::raft::ClientWriteResult; use crate::raft::raft_inner::RaftInner; @@ -71,7 +71,7 @@ where C: RaftTypeConfig "change_membership: start to commit joint config" ); - let (tx, rx) = oneshot_channel::(); + let (tx, rx) = new_responder_pair::(); // res is error if membership cannot be changed. // If no error, it will enter a joint state @@ -106,7 +106,7 @@ where C: RaftTypeConfig tracing::debug!("committed a joint config: {} {:?}", log_id, joint); tracing::debug!("the second step is to change to uniform config: {:?}", changes); - let (tx, rx) = oneshot_channel::(); + let (tx, rx) = new_responder_pair::(); // The second step, send a NOOP change to flatten the joint config. let changes = ChangeMembers::AddVoterIds(Default::default()); @@ -132,7 +132,7 @@ where C: RaftTypeConfig node: C::Node, blocking: bool, ) -> Result, Fatal> { - let (tx, rx) = oneshot_channel::(); + let (tx, rx) = new_responder_pair::(); let msg = RaftMsg::ChangeMembership { changes: ChangeMembers::AddNodes(btreemap! {id.clone()=>node}), @@ -229,14 +229,12 @@ where C: RaftTypeConfig } } -fn oneshot_channel() -> (OneshotResponder, OneshotReceiverOf) +fn new_responder_pair() -> (ProgressResponder, OneshotReceiverOf) where C: RaftTypeConfig, T: OptionalSend, { - let (tx, rx) = C::oneshot(); + let (tx, _commit_rx, complete_rx) = ProgressResponder::new(); - let tx = OneshotResponder::new(tx); - - (tx, rx) + (tx, complete_rx) } diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index b8fd755bb..f1317eea3 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -190,7 +190,7 @@ macro_rules! declare_raft_types { (Vote , , $crate::impls::Vote ), (Entry , , $crate::impls::Entry ), (SnapshotData , , std::io::Cursor> ), - (Responder , , $crate::impls::OneshotResponder where T: $crate::OptionalSend + 'static ), + (Responder , , $crate::impls::ProgressResponder where T: $crate::OptionalSend + 'static ), (AsyncRuntime , , $crate::impls::TokioRuntime ), ); diff --git a/openraft/src/raft/responder/core_responder.rs b/openraft/src/raft/responder/core_responder.rs index 042b8ae1f..b7617e2f9 100644 --- a/openraft/src/raft/responder/core_responder.rs +++ b/openraft/src/raft/responder/core_responder.rs @@ -1,5 +1,6 @@ +use crate::LogId; use crate::RaftTypeConfig; -use crate::impls::OneshotResponder; +use crate::impls::ProgressResponder; use crate::raft::ClientWriteResult; use crate::raft::responder::Responder; use crate::type_config::alias::WriteResponderOf; @@ -7,21 +8,28 @@ use crate::type_config::alias::WriteResponderOf; /// The responder used in RaftCore. /// /// RaftCore use this responder to send response to the caller. -/// It is either an oneshot responder or a user-defined responder. +/// It is either a progress responder or a user-defined responder. pub(crate) enum CoreResponder where C: RaftTypeConfig { - Oneshot(OneshotResponder>), + Progress(ProgressResponder>), UserDefined(WriteResponderOf), } -impl Responder> for CoreResponder +impl Responder> for CoreResponder where C: RaftTypeConfig { - fn send(self, res: ClientWriteResult) { + fn on_commit(&mut self, log_id: LogId) { match self { - Self::Oneshot(responder) => responder.send(res), - Self::UserDefined(responder) => responder.send(res), + Self::Progress(responder) => responder.on_commit(log_id), + Self::UserDefined(responder) => responder.on_commit(log_id), + } + } + + fn on_complete(self, res: ClientWriteResult) { + match self { + Self::Progress(responder) => responder.on_complete(res), + Self::UserDefined(responder) => responder.on_complete(res), } } } diff --git a/openraft/src/raft/responder/impls/mod.rs b/openraft/src/raft/responder/impls/mod.rs new file mode 100644 index 000000000..c83da2fc2 --- /dev/null +++ b/openraft/src/raft/responder/impls/mod.rs @@ -0,0 +1,5 @@ +mod oneshot_responder; +mod progress_responder; + +pub use oneshot_responder::OneshotResponder; +pub use progress_responder::ProgressResponder; diff --git a/openraft/src/raft/responder/impls.rs b/openraft/src/raft/responder/impls/oneshot_responder.rs similarity index 95% rename from openraft/src/raft/responder/impls.rs rename to openraft/src/raft/responder/impls/oneshot_responder.rs index eb97f25ca..7216ff3c4 100644 --- a/openraft/src/raft/responder/impls.rs +++ b/openraft/src/raft/responder/impls/oneshot_responder.rs @@ -47,12 +47,12 @@ where } } -impl Responder for OneshotResponder +impl Responder for OneshotResponder where C: RaftTypeConfig, T: OptionalSend + 'static, { - fn send(self, res: T) { + fn on_complete(self, res: T) { let res = self.tx.send(res); if res.is_ok() { diff --git a/openraft/src/raft/responder/impls/progress_responder.rs b/openraft/src/raft/responder/impls/progress_responder.rs new file mode 100644 index 000000000..0c3cc8396 --- /dev/null +++ b/openraft/src/raft/responder/impls/progress_responder.rs @@ -0,0 +1,236 @@ +use crate::LogId; +use crate::OptionalSend; +use crate::RaftTypeConfig; +use crate::async_runtime::OneshotSender; +use crate::raft::responder::Responder; +use crate::type_config::TypeConfigExt; +use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::OneshotSenderOf; + +/// A [`Responder`] implementation that sends notifications via two oneshot channels. +/// +/// This responder provides both commit and completion notifications: +/// - **Commit channel**: Notifies when the log entry is committed (replicated to a quorum) +/// - **Complete channel**: Sends the final result when the request completes +/// +/// Use this when the caller wants to be notified at both stages: +/// 1. When the entry is committed and safe to read +/// 2. When the entry is applied and the result is available +/// +/// # Example +/// +/// ```ignore +/// let (responder, commit_rx, complete_rx) = ProgressResponder::new(); +/// +/// // Send write request with the responder +/// raft.client_write_ff(request, responder).await; +/// +/// // Wait for commit notification +/// let commit_log_id = commit_rx.await.unwrap(); +/// // Now safe to read the committed data +/// +/// // Wait for completion +/// let result = complete_rx.await.unwrap(); +/// // Now have the final result +/// ``` +pub struct ProgressResponder +where + C: RaftTypeConfig, + T: OptionalSend, +{ + commit_tx: Option>>, + complete_tx: OneshotSenderOf, +} + +impl ProgressResponder +where + C: RaftTypeConfig, + T: OptionalSend, +{ + /// Create a new responder with commit and complete receivers. + /// + /// This is a convenience method that creates two oneshot channels and returns + /// a [`ProgressResponder`] wrapping both senders, along with both receivers. + /// + /// # Returns + /// + /// A tuple containing: + /// - The [`ProgressResponder`] that can send both commit and complete notifications + /// - The commit receiver for receiving the committed log ID + /// - The complete receiver for receiving the final result + pub fn new() -> (Self, OneshotReceiverOf>, OneshotReceiverOf) { + let (commit_tx, commit_rx) = C::oneshot(); + let (complete_tx, complete_rx) = C::oneshot(); + + let responder = Self { + commit_tx: Some(commit_tx), + complete_tx, + }; + + (responder, commit_rx, complete_rx) + } +} + +impl Responder for ProgressResponder +where + C: RaftTypeConfig, + T: OptionalSend + 'static, +{ + fn on_commit(&mut self, log_id: LogId) { + if let Some(tx) = self.commit_tx.take() { + let res = tx.send(log_id); + + if res.is_ok() { + tracing::debug!("ProgressResponder.commit_tx.send: is_ok: {}", res.is_ok()); + } else { + tracing::warn!("ProgressResponder.commit_tx.send: is_ok: {}", res.is_ok()); + } + } + } + + fn on_complete(self, res: T) { + let res = self.complete_tx.send(res); + + if res.is_ok() { + tracing::debug!("ProgressResponder.complete_tx.send: is_ok: {}", res.is_ok()); + } else { + tracing::warn!("ProgressResponder.complete_tx.send: is_ok: {}", res.is_ok()); + } + } +} + +#[cfg(test)] +mod tests { + use crate::engine::testing::UTConfig; + use crate::engine::testing::log_id; + use crate::raft::responder::ProgressResponder; + use crate::raft::responder::Responder; + + #[tokio::test] + async fn test_twoshot_responder_new() { + let (_responder, mut commit_rx, mut complete_rx): (ProgressResponder, _, _) = + ProgressResponder::new(); + + // Receivers should be created but not yet have values + assert!(commit_rx.try_recv().is_err()); + assert!(complete_rx.try_recv().is_err()); + } + + #[tokio::test] + async fn test_twoshot_responder_on_commit() { + let (mut responder, commit_rx, _complete_rx): (ProgressResponder, _, _) = + ProgressResponder::new(); + + let test_log_id = log_id(1, 2, 3); + + // Send commit notification + responder.on_commit(test_log_id); + + // Commit receiver should receive the log_id + let received_log_id = commit_rx.await.unwrap(); + assert_eq!(test_log_id, received_log_id); + } + + #[tokio::test] + async fn test_twoshot_responder_on_commit_multiple_calls() { + let (mut responder, commit_rx, _complete_rx): (ProgressResponder, _, _) = + ProgressResponder::new(); + + let test_log_id_1 = log_id(1, 2, 3); + let test_log_id_2 = log_id(2, 3, 4); + + // Send first commit notification + responder.on_commit(test_log_id_1); + + // Second call should be ignored (tx is taken on first call) + responder.on_commit(test_log_id_2); + + // Commit receiver should only receive the first log_id + let received_log_id = commit_rx.await.unwrap(); + assert_eq!(test_log_id_1, received_log_id); + } + + #[tokio::test] + async fn test_twoshot_responder_send() { + let (responder, _commit_rx, complete_rx): (ProgressResponder, _, _) = + ProgressResponder::new(); + + let test_result = "test_result".to_string(); + + // Send completion result + responder.on_complete(test_result.clone()); + + // Complete receiver should receive the result + let received_result = complete_rx.await.unwrap(); + assert_eq!(test_result, received_result); + } + + #[tokio::test] + async fn test_twoshot_responder_both_channels() { + let (mut responder, commit_rx, complete_rx): (ProgressResponder, _, _) = + ProgressResponder::new(); + + let test_log_id = log_id(1, 2, 3); + let test_result = "test_result".to_string(); + + // Send commit notification + responder.on_commit(test_log_id); + + // Verify commit was received + let received_log_id = commit_rx.await.unwrap(); + assert_eq!(test_log_id, received_log_id); + + // Send completion result + responder.on_complete(test_result.clone()); + + // Verify completion was received + let received_result = complete_rx.await.unwrap(); + assert_eq!(test_result, received_result); + } + + #[tokio::test] + async fn test_twoshot_responder_send_without_commit() { + let (responder, mut commit_rx, complete_rx): (ProgressResponder, _, _) = + ProgressResponder::new(); + + let test_result = "test_result".to_string(); + + // Send completion without calling on_commit + responder.on_complete(test_result.clone()); + + // Complete receiver should still receive the result + let received_result = complete_rx.await.unwrap(); + assert_eq!(test_result, received_result); + + // Commit receiver should not have received anything + assert!(commit_rx.try_recv().is_err()); + } + + #[tokio::test] + async fn test_twoshot_responder_ordering() { + let (mut responder, commit_rx, complete_rx): (ProgressResponder, _, _) = + ProgressResponder::new(); + + let test_log_id = log_id(5, 10, 15); + let test_result = 42; + + // Create tasks to receive in parallel + let commit_task = tokio::spawn(async move { commit_rx.await.unwrap() }); + + let complete_task = tokio::spawn(async move { complete_rx.await.unwrap() }); + + // Small delay to ensure receivers are waiting + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + // Send in order: commit first, then complete + responder.on_commit(test_log_id); + responder.on_complete(test_result); + + // Both should complete successfully + let received_log_id = commit_task.await.unwrap(); + let received_result = complete_task.await.unwrap(); + + assert_eq!(test_log_id, received_log_id); + assert_eq!(test_result, received_result); + } +} diff --git a/openraft/src/raft/responder/mod.rs b/openraft/src/raft/responder/mod.rs index b479b595c..0b686f019 100644 --- a/openraft/src/raft/responder/mod.rs +++ b/openraft/src/raft/responder/mod.rs @@ -1,29 +1,52 @@ -//! API to consumer a response when a client write request is completed. +//! API to consume a response when a client write request is completed. pub(crate) mod core_responder; pub(crate) mod impls; pub use impls::OneshotResponder; +pub use impls::ProgressResponder; +use openraft_macros::since; +use crate::LogId; use crate::OptionalSend; +use crate::RaftTypeConfig; /// A trait that lets `RaftCore` send a result back to the client or to somewhere else. /// /// This is a generic abstraction for sending results of any type `T`. -/// It is created for each request and is sent to `RaftCore`. -/// Once the request is completed, the `RaftCore` sends the result via it. -/// The implementation of the trait then forwards the response to the application. -/// /// Usually an implementation of [`Responder`] is a oneshot channel Sender. /// -/// Responders are typically created by the application and passed to Raft APIs -/// like [`Raft::client_write_ff`](crate::raft::Raft::client_write_ff). +/// ## Lifecycle Callbacks +/// +/// - [`on_commit()`](Self::on_commit): Called when locally committed (optional) +/// - [`on_complete()`](Self::on_complete): Sends the final result /// /// # Type Parameters /// /// - `T`: The type of value to send through this responder -pub trait Responder: OptionalSend + 'static { - /// Send result when the request has been completed. +pub trait Responder +where + Self: OptionalSend + Sized + 'static, + C: RaftTypeConfig, +{ + /// Called when the log entry is locally committed (safe to read). + /// + /// Invoked when the log has been replicated to a quorum. At this point, the log is guaranteed + /// to be visible to all future leaders and can be read immediately. + /// + /// # Parameters + /// + /// - `log_id`: The log ID assigned by the proposing leader. + /// + /// Default implementation does nothing. + #[since(version = "0.10.0")] + fn on_commit(&mut self, _log_id: LogId) {} + + /// Called when the request completes (applied; previously it is `send`). + /// Send the final result to the client. /// - /// This method is called by the `RaftCore` once the request has been processed. - fn send(self, result: T); + /// Invoked in two scenarios: + /// - **Normal**: Log entry applied to the state machine + /// - **Early termination**: Request failed (e.g., `ForwardToLeader` error) + #[since(version = "0.10.0")] + fn on_complete(self, result: T); } diff --git a/openraft/src/storage/v2/apply_responder_inner.rs b/openraft/src/storage/v2/apply_responder_inner.rs index 454f6ba96..eba623658 100644 --- a/openraft/src/storage/v2/apply_responder_inner.rs +++ b/openraft/src/storage/v2/apply_responder_inner.rs @@ -32,7 +32,7 @@ impl ApplyResponderInner { data: response, membership: None, }); - responder.send(res); + responder.on_complete(res); } ApplyResponderInner::Membership { log_id, @@ -44,7 +44,7 @@ impl ApplyResponderInner { data: response, membership: Some(membership), }); - responder.send(res); + responder.on_complete(res); } } } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 16d4caa3f..ec47d6949 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -137,7 +137,7 @@ pub trait RaftTypeConfig: /// to Raft APIs that need to send asynchronous responses. /// /// [`Raft::client_write`]: `crate::raft::Raft::client_write` - type Responder: Responder + type Responder: Responder where T: OptionalSend + 'static; } diff --git a/tests/tests/client_api/main.rs b/tests/tests/client_api/main.rs index 826fac62d..00d757181 100644 --- a/tests/tests/client_api/main.rs +++ b/tests/tests/client_api/main.rs @@ -15,6 +15,7 @@ mod t13_get_snapshot; mod t13_install_full_snapshot; mod t13_trigger_snapshot; mod t14_transfer_leader; +mod t15_client_write_with_twoshot; mod t16_with_raft_state; mod t16_with_state_machine; mod t50_lagging_network_write; diff --git a/tests/tests/client_api/t10_client_writes.rs b/tests/tests/client_api/t10_client_writes.rs index 4497bd492..dcfc3f164 100644 --- a/tests/tests/client_api/t10_client_writes.rs +++ b/tests/tests/client_api/t10_client_writes.rs @@ -5,7 +5,7 @@ use futures::prelude::*; use maplit::btreeset; use openraft::Config; use openraft::SnapshotPolicy; -use openraft::impls::OneshotResponder; +use openraft::impls::ProgressResponder; use openraft::raft::ClientWriteResponse; use openraft_memstore::ClientRequest; use openraft_memstore::IntoMemClientRequest; @@ -89,17 +89,17 @@ async fn client_write_ff() -> Result<()> { let n0 = router.get_raft_handle(&0)?; - let (responder, rx) = OneshotResponder::new_pair(); + let (responder, _commit_rx, complete_rx) = ProgressResponder::new(); n0.client_write_ff(ClientRequest::make_request("foo", 2), Some(responder)).await?; - let got: ClientWriteResponse = rx.await??; + let got: ClientWriteResponse = complete_rx.await??; assert_eq!(None, got.response().0.as_deref()); // Deliberately set the responder to None and do not wait for the result. n0.client_write_ff(ClientRequest::make_request("foo", 3), None).await?; - let (responder, rx) = OneshotResponder::new_pair(); + let (responder, _commit_rx, complete_rx) = ProgressResponder::new(); n0.client_write_ff(ClientRequest::make_request("foo", 4), Some(responder)).await?; - let got: ClientWriteResponse = rx.await??; + let got: ClientWriteResponse = complete_rx.await??; assert_eq!(Some("request-3"), got.response().0.as_deref()); Ok(()) diff --git a/tests/tests/client_api/t15_client_write_with_twoshot.rs b/tests/tests/client_api/t15_client_write_with_twoshot.rs new file mode 100644 index 000000000..a39e5dba6 --- /dev/null +++ b/tests/tests/client_api/t15_client_write_with_twoshot.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; + +use anyhow::Result; +use maplit::btreeset; +use openraft::Config; +use openraft::impls::ProgressResponder; +use openraft::raft::ClientWriteResult; +use openraft_memstore::ClientRequest; +use openraft_memstore::IntoMemClientRequest; +use openraft_memstore::TypeConfig; + +use crate::fixtures::RaftRouter; +use crate::fixtures::log_id; +use crate::fixtures::ut_harness; + +/// Test Raft::client_write_ff with ProgressResponder +/// +/// Verify that on_commit() is called when a log entry is committed, +/// and on_complete() is called when the entry is applied. +#[tracing::instrument] +#[test_harness::test(harness = ut_harness)] +async fn client_write_ff_with_progress_responder() -> Result<()> { + let config = Arc::new( + Config { + enable_tick: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let n0 = router.get_raft_handle(&0)?; + + // Test with ProgressResponder - first write + let (responder, commit_rx, complete_rx) = ProgressResponder::new(); + n0.client_write_ff(ClientRequest::make_request("foo", 10), Some(responder)).await?; + + log_index += 1; + + // Wait for commit notification + let commit_log_id = commit_rx.await?; + tracing::info!("Received commit notification for log_id: {:?}", commit_log_id); + assert_eq!(log_id(1, 0, log_index), commit_log_id); + + // Wait for completion + let result: ClientWriteResult = complete_rx.await?; + tracing::info!("Received completion response: {:?}", result); + let response = result?; + assert_eq!(log_id(1, 0, log_index), response.log_id); + // First write returns None (no previous value) + assert_eq!(None, response.response().0.as_deref()); + + // Test another write - pattern follows t10: write("foo", 11) with no responder, then write("foo", + // 12) + n0.client_write_ff(ClientRequest::make_request("foo", 11), None).await?; + log_index += 1; + + let (responder, commit_rx, complete_rx) = ProgressResponder::new(); + n0.client_write_ff(ClientRequest::make_request("foo", 12), Some(responder)).await?; + log_index += 1; + + // Wait for commit notification on second write + let commit_log_id_2 = commit_rx.await?; + tracing::info!("Received second commit notification for log_id: {:?}", commit_log_id_2); + assert_eq!(log_id(1, 0, log_index), commit_log_id_2); + + let result: ClientWriteResult = complete_rx.await?; + let response = result?; + assert_eq!(log_id(1, 0, log_index), response.log_id); + // Should return the value from the previous write (11) + assert_eq!(Some("request-11"), response.response().0.as_deref()); + + Ok(()) +}