Skip to content

Commit aa54527

Browse files
committed
change: Add on_commit on_complete to Responder trait
Add `ProgressResponder` and use it as default `Responder` that receives `on_commit` and `on_complete` events. - `on_commit` is called when a log entry is committed; - `on_complete` is called when the log entry is applied to state machine or an error occur. Replace OneshotResponder with ProgressResponder throughout the codebase to provide both commit and completion(apply or error) notifications by default. - Fix: #1460 Upgrade tip: When not using application defined `Responder`, nothing todo. Otherwise, update the implementation of `Responder` by adding `on_commit` if needed, and replacing `send` with `on_complete`.
1 parent ecfe9e8 commit aa54527

File tree

17 files changed

+402
-48
lines changed

17 files changed

+402
-48
lines changed

openraft/src/core/raft_core.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -463,19 +463,19 @@ where
463463
&mut self,
464464
changes: ChangeMembers<C>,
465465
retain: bool,
466-
tx: OneshotResponder<C, ClientWriteResult<C>>,
466+
tx: ProgressResponder<C, ClientWriteResult<C>>,
467467
) {
468468
let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
469469
let new_membership = match res {
470470
Ok(x) => x,
471471
Err(e) => {
472-
tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
472+
tx.on_complete(Err(ClientWriteError::ChangeMembershipError(e)));
473473
return;
474474
}
475475
};
476476

477477
let ent = C::Entry::new_membership(LogIdOf::<C>::default(), new_membership);
478-
self.write_entry(ent, Some(CoreResponder::Oneshot(tx)));
478+
self.write_entry(ent, Some(CoreResponder::Progress(tx)));
479479
}
480480

481481
/// Write a log entry to the cluster through raft protocol.
@@ -487,7 +487,7 @@ where
487487
/// The calling side may not receive a result from `resp_tx`, if raft is shut down.
488488
///
489489
/// The responder `resp_tx` is either Responder type of
490-
/// [`RaftTypeConfig::Responder`] (application-defined) or [`OneshotResponder`]
490+
/// [`RaftTypeConfig::Responder`] (application-defined) or [`ProgressResponder`]
491491
/// (general-purpose); the former is for application-defined entries like user data, the
492492
/// latter is for membership configuration changes.
493493
#[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))]
@@ -502,7 +502,7 @@ where
502502
if let Some(to) = lh.leader.get_transfer_to() {
503503
if let Some(tx) = tx {
504504
let err = lh.state.new_forward_to_leader(to.clone());
505-
tx.send(Err(ClientWriteError::ForwardToLeader(err)));
505+
tx.on_complete(Err(ClientWriteError::ForwardToLeader(err)));
506506
}
507507
return;
508508
}
@@ -818,6 +818,11 @@ where
818818
let entry_count = last.index() + 1 - first.index();
819819
self.runtime_stats.apply_batch.record(entry_count);
820820

821+
for (index, responder) in responders.iter_mut() {
822+
let log_id = self.engine.state.get_log_id(*index).unwrap();
823+
responder.on_commit(log_id);
824+
}
825+
821826
let cmd = sm::Command::apply(first, last.clone(), responders);
822827
self.sm_handle.send(cmd).map_err(|e| StorageError::apply(last, AnyError::error(e)))?;
823828

@@ -1829,7 +1834,7 @@ where
18291834
#[allow(clippy::let_underscore_future)]
18301835
let _ = C::spawn(async move {
18311836
for (log_index, tx) in removed.into_iter() {
1832-
tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
1837+
tx.on_complete(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
18331838
leader_id: leader_id.clone(),
18341839
leader_node: leader_node.clone(),
18351840
})));

openraft/src/core/raft_msg/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ where C: RaftTypeConfig
9393
/// config will be converted into learners, otherwise they will be removed.
9494
retain: bool,
9595

96-
tx: OneshotResponder<C, ClientWriteResult<C>>,
96+
tx: ProgressResponder<C, ClientWriteResult<C>>,
9797
},
9898

9999
ExternalCoreRequest {

openraft/src/engine/engine_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ where C: RaftTypeConfig
262262
tx: Option<R>,
263263
) -> Option<(LeaderHandler<'_, C>, Option<R>)>
264264
where
265-
R: Responder<ClientWriteResult<C>>,
265+
R: Responder<C, ClientWriteResult<C>>,
266266
{
267267
let res = self.leader_handler();
268268
let forward_err = match res {
@@ -274,7 +274,7 @@ where C: RaftTypeConfig
274274
};
275275

276276
if let Some(tx) = tx {
277-
tx.send(Err(forward_err.into()));
277+
tx.on_complete(Err(forward_err.into()));
278278
}
279279

280280
None

openraft/src/impls/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub use crate::entry::Entry;
2727
pub use crate::node::BasicNode;
2828
pub use crate::node::EmptyNode;
2929
pub use crate::raft::responder::impls::OneshotResponder;
30+
pub use crate::raft::responder::impls::ProgressResponder;
3031
#[cfg(feature = "tokio-rt")]
3132
pub use crate::type_config::async_runtime::tokio_impls::TokioRuntime;
3233

openraft/src/raft/api/app.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,11 @@ where C: RaftTypeConfig
5050
app_data: C::D,
5151
// TODO: ClientWriteError can only be ForwardToLeader Error
5252
) -> Result<Result<ClientWriteResponse<C>, ClientWriteError<C>>, Fatal<C>> {
53-
let (tx, rx) = C::oneshot();
54-
let responder = OneshotResponder::new(tx);
53+
let (responder, _commit_rx, complete_rx) = ProgressResponder::new();
5554

56-
self.do_client_write_ff(app_data, Some(CoreResponder::Oneshot(responder))).await?;
55+
self.do_client_write_ff(app_data, Some(CoreResponder::Progress(responder))).await?;
5756

58-
let res: ClientWriteResult<C> = self.inner.recv_msg(rx).await?;
57+
let res: ClientWriteResult<C> = self.inner.recv_msg(complete_rx).await?;
5958

6059
Ok(res)
6160
}

openraft/src/raft/api/management.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ where C: RaftTypeConfig
7171
"change_membership: start to commit joint config"
7272
);
7373

74-
let (tx, rx) = oneshot_channel::<C, _>();
74+
let (tx, rx) = new_responder_pair::<C, _>();
7575

7676
// res is error if membership cannot be changed.
7777
// If no error, it will enter a joint state
@@ -106,7 +106,7 @@ where C: RaftTypeConfig
106106
tracing::debug!("committed a joint config: {} {:?}", log_id, joint);
107107
tracing::debug!("the second step is to change to uniform config: {:?}", changes);
108108

109-
let (tx, rx) = oneshot_channel::<C, _>();
109+
let (tx, rx) = new_responder_pair::<C, _>();
110110

111111
// The second step, send a NOOP change to flatten the joint config.
112112
let changes = ChangeMembers::AddVoterIds(Default::default());
@@ -132,7 +132,7 @@ where C: RaftTypeConfig
132132
node: C::Node,
133133
blocking: bool,
134134
) -> Result<ClientWriteResult<C>, Fatal<C>> {
135-
let (tx, rx) = oneshot_channel::<C, _>();
135+
let (tx, rx) = new_responder_pair::<C, _>();
136136

137137
let msg = RaftMsg::ChangeMembership {
138138
changes: ChangeMembers::AddNodes(btreemap! {id.clone()=>node}),
@@ -229,14 +229,12 @@ where C: RaftTypeConfig
229229
}
230230
}
231231

232-
fn oneshot_channel<C, T>() -> (OneshotResponder<C, T>, OneshotReceiverOf<C, T>)
232+
fn new_responder_pair<C, T>() -> (ProgressResponder<C, T>, OneshotReceiverOf<C, T>)
233233
where
234234
C: RaftTypeConfig,
235235
T: OptionalSend,
236236
{
237-
let (tx, rx) = C::oneshot();
237+
let (tx, _commit_rx, complete_rx) = ProgressResponder::new();
238238

239-
let tx = OneshotResponder::new(tx);
240-
241-
(tx, rx)
239+
(tx, complete_rx)
242240
}

openraft/src/raft/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ macro_rules! declare_raft_types {
190190
(Vote , , $crate::impls::Vote<Self> ),
191191
(Entry , , $crate::impls::Entry<Self> ),
192192
(SnapshotData , , std::io::Cursor<Vec<u8>> ),
193-
(Responder<T> , , $crate::impls::OneshotResponder<Self, T> where T: $crate::OptionalSend + 'static ),
193+
(Responder<T> , , $crate::impls::ProgressResponder<Self, T> where T: $crate::OptionalSend + 'static ),
194194
(AsyncRuntime , , $crate::impls::TokioRuntime ),
195195
);
196196

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::LogId;
12
use crate::RaftTypeConfig;
23
use crate::impls::OneshotResponder;
34
use crate::raft::ClientWriteResult;
@@ -7,21 +8,28 @@ use crate::type_config::alias::WriteResponderOf;
78
/// The responder used in RaftCore.
89
///
910
/// RaftCore use this responder to send response to the caller.
10-
/// It is either an oneshot responder or a user-defined responder.
11+
/// It is either a progress responder or a user-defined responder.
1112
pub(crate) enum CoreResponder<C>
1213
where C: RaftTypeConfig
1314
{
14-
Oneshot(OneshotResponder<C, ClientWriteResult<C>>),
15+
Progress(ProgressResponder<C, ClientWriteResult<C>>),
1516
UserDefined(WriteResponderOf<C>),
1617
}
1718

18-
impl<C> Responder<ClientWriteResult<C>> for CoreResponder<C>
19+
impl<C> Responder<C, ClientWriteResult<C>> for CoreResponder<C>
1920
where C: RaftTypeConfig
2021
{
21-
fn send(self, res: ClientWriteResult<C>) {
22+
fn on_commit(&mut self, log_id: LogId<C>) {
2223
match self {
23-
Self::Oneshot(responder) => responder.send(res),
24-
Self::UserDefined(responder) => responder.send(res),
24+
Self::Progress(responder) => responder.on_commit(log_id),
25+
Self::UserDefined(responder) => responder.on_commit(log_id),
26+
}
27+
}
28+
29+
fn on_complete(self, res: ClientWriteResult<C>) {
30+
match self {
31+
Self::Progress(responder) => responder.on_complete(res),
32+
Self::UserDefined(responder) => responder.on_complete(res),
2533
}
2634
}
2735
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod oneshot_responder;
2+
mod progress_responder;
3+
4+
pub use oneshot_responder::OneshotResponder;
5+
pub use progress_responder::ProgressResponder;

openraft/src/raft/responder/impls.rs renamed to openraft/src/raft/responder/impls/oneshot_responder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ where
4747
}
4848
}
4949

50-
impl<C, T> Responder<T> for OneshotResponder<C, T>
50+
impl<C, T> Responder<C, T> for OneshotResponder<C, T>
5151
where
5252
C: RaftTypeConfig,
5353
T: OptionalSend + 'static,
5454
{
55-
fn send(self, res: T) {
55+
fn on_complete(self, res: T) {
5656
let res = self.tx.send(res);
5757

5858
if res.is_ok() {

0 commit comments

Comments
 (0)