Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::StorageIOResult;
use crate::error::Timeout;
use crate::impls::OneshotResponder;
use crate::impls::ProgressResponder;
use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::RaftDataMetrics;
Expand Down Expand Up @@ -463,19 +463,19 @@ where
&mut self,
changes: ChangeMembers<C>,
retain: bool,
tx: OneshotResponder<C, ClientWriteResult<C>>,
tx: ProgressResponder<C, ClientWriteResult<C>>,
) {
let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
let new_membership = match res {
Ok(x) => x,
Err(e) => {
tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
tx.on_complete(Err(ClientWriteError::ChangeMembershipError(e)));
return;
}
};

let ent = C::Entry::new_membership(LogIdOf::<C>::default(), new_membership);
self.write_entry(ent, Some(CoreResponder::Oneshot(tx)));
self.write_entry(ent, Some(CoreResponder::Progress(tx)));
}

/// Write a log entry to the cluster through raft protocol.
Expand All @@ -487,7 +487,7 @@ where
/// The calling side may not receive a result from `resp_tx`, if raft is shut down.
///
/// The responder `resp_tx` is either Responder type of
/// [`RaftTypeConfig::Responder`] (application-defined) or [`OneshotResponder`]
/// [`RaftTypeConfig::Responder`] (application-defined) or [`ProgressResponder`]
/// (general-purpose); the former is for application-defined entries like user data, the
/// latter is for membership configuration changes.
#[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))]
Expand All @@ -502,7 +502,7 @@ where
if let Some(to) = lh.leader.get_transfer_to() {
if let Some(tx) = tx {
let err = lh.state.new_forward_to_leader(to.clone());
tx.send(Err(ClientWriteError::ForwardToLeader(err)));
tx.on_complete(Err(ClientWriteError::ForwardToLeader(err)));
}
return;
}
Expand Down Expand Up @@ -818,6 +818,11 @@ where
let entry_count = last.index() + 1 - first.index();
self.runtime_stats.apply_batch.record(entry_count);

for (index, responder) in responders.iter_mut() {
let log_id = self.engine.state.get_log_id(*index).unwrap();
responder.on_commit(log_id);
}

let cmd = sm::Command::apply(first, last.clone(), responders);
self.sm_handle.send(cmd).map_err(|e| StorageError::apply(last, AnyError::error(e)))?;

Expand Down Expand Up @@ -1829,7 +1834,7 @@ where
#[allow(clippy::let_underscore_future)]
let _ = C::spawn(async move {
for (log_index, tx) in removed.into_iter() {
tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
tx.on_complete(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id: leader_id.clone(),
leader_node: leader_node.clone(),
})));
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::display_ext::DisplayBTreeMapDebugValueExt;
use crate::error::CheckIsLeaderError;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::impls::OneshotResponder;
use crate::impls::ProgressResponder;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteResult;
Expand Down Expand Up @@ -93,7 +93,7 @@ where C: RaftTypeConfig
/// config will be converted into learners, otherwise they will be removed.
retain: bool,

tx: OneshotResponder<C, ClientWriteResult<C>>,
tx: ProgressResponder<C, ClientWriteResult<C>>,
},

ExternalCoreRequest {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ where C: RaftTypeConfig
tx: Option<R>,
) -> Option<(LeaderHandler<'_, C>, Option<R>)>
where
R: Responder<ClientWriteResult<C>>,
R: Responder<C, ClientWriteResult<C>>,
{
let res = self.leader_handler();
let forward_err = match res {
Expand All @@ -274,7 +274,7 @@ where C: RaftTypeConfig
};

if let Some(tx) = tx {
tx.send(Err(forward_err.into()));
tx.on_complete(Err(forward_err.into()));
}

None
Expand Down
1 change: 1 addition & 0 deletions openraft/src/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub use crate::entry::Entry;
pub use crate::node::BasicNode;
pub use crate::node::EmptyNode;
pub use crate::raft::responder::impls::OneshotResponder;
pub use crate::raft::responder::impls::ProgressResponder;
#[cfg(feature = "tokio-rt")]
pub use crate::type_config::async_runtime::tokio_impls::TokioRuntime;

Expand Down
9 changes: 4 additions & 5 deletions openraft/src/raft/api/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::core::raft_msg::RaftMsg;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::Fatal;
use crate::impls::OneshotResponder;
use crate::impls::ProgressResponder;
use crate::raft::ClientWriteResponse;
use crate::raft::ClientWriteResult;
use crate::raft::linearizable_read::Linearizer;
Expand Down Expand Up @@ -50,12 +50,11 @@ where C: RaftTypeConfig
app_data: C::D,
// TODO: ClientWriteError can only be ForwardToLeader Error
) -> Result<Result<ClientWriteResponse<C>, ClientWriteError<C>>, Fatal<C>> {
let (tx, rx) = C::oneshot();
let responder = OneshotResponder::new(tx);
let (responder, _commit_rx, complete_rx) = ProgressResponder::new();

self.do_client_write_ff(app_data, Some(CoreResponder::Oneshot(responder))).await?;
self.do_client_write_ff(app_data, Some(CoreResponder::Progress(responder))).await?;

let res: ClientWriteResult<C> = self.inner.recv_msg(rx).await?;
let res: ClientWriteResult<C> = self.inner.recv_msg(complete_rx).await?;

Ok(res)
}
Expand Down
16 changes: 7 additions & 9 deletions openraft/src/raft/api/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::display_ext::DisplayResult;
use crate::display_ext::DisplayResultExt;
use crate::error::Fatal;
use crate::error::InitializeError;
use crate::impls::OneshotResponder;
use crate::impls::ProgressResponder;
use crate::membership::IntoNodes;
use crate::raft::ClientWriteResult;
use crate::raft::raft_inner::RaftInner;
Expand Down Expand Up @@ -71,7 +71,7 @@ where C: RaftTypeConfig
"change_membership: start to commit joint config"
);

let (tx, rx) = oneshot_channel::<C, _>();
let (tx, rx) = new_responder_pair::<C, _>();

// res is error if membership cannot be changed.
// If no error, it will enter a joint state
Expand Down Expand Up @@ -106,7 +106,7 @@ where C: RaftTypeConfig
tracing::debug!("committed a joint config: {} {:?}", log_id, joint);
tracing::debug!("the second step is to change to uniform config: {:?}", changes);

let (tx, rx) = oneshot_channel::<C, _>();
let (tx, rx) = new_responder_pair::<C, _>();

// The second step, send a NOOP change to flatten the joint config.
let changes = ChangeMembers::AddVoterIds(Default::default());
Expand All @@ -132,7 +132,7 @@ where C: RaftTypeConfig
node: C::Node,
blocking: bool,
) -> Result<ClientWriteResult<C>, Fatal<C>> {
let (tx, rx) = oneshot_channel::<C, _>();
let (tx, rx) = new_responder_pair::<C, _>();

let msg = RaftMsg::ChangeMembership {
changes: ChangeMembers::AddNodes(btreemap! {id.clone()=>node}),
Expand Down Expand Up @@ -229,14 +229,12 @@ where C: RaftTypeConfig
}
}

fn oneshot_channel<C, T>() -> (OneshotResponder<C, T>, OneshotReceiverOf<C, T>)
fn new_responder_pair<C, T>() -> (ProgressResponder<C, T>, OneshotReceiverOf<C, T>)
where
C: RaftTypeConfig,
T: OptionalSend,
{
let (tx, rx) = C::oneshot();
let (tx, _commit_rx, complete_rx) = ProgressResponder::new();

let tx = OneshotResponder::new(tx);

(tx, rx)
(tx, complete_rx)
}
2 changes: 1 addition & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ macro_rules! declare_raft_types {
(Vote , , $crate::impls::Vote<Self> ),
(Entry , , $crate::impls::Entry<Self> ),
(SnapshotData , , std::io::Cursor<Vec<u8>> ),
(Responder<T> , , $crate::impls::OneshotResponder<Self, T> where T: $crate::OptionalSend + 'static ),
(Responder<T> , , $crate::impls::ProgressResponder<Self, T> where T: $crate::OptionalSend + 'static ),
(AsyncRuntime , , $crate::impls::TokioRuntime ),
);

Expand Down
22 changes: 15 additions & 7 deletions openraft/src/raft/responder/core_responder.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
use crate::LogId;
use crate::RaftTypeConfig;
use crate::impls::OneshotResponder;
use crate::impls::ProgressResponder;
use crate::raft::ClientWriteResult;
use crate::raft::responder::Responder;
use crate::type_config::alias::WriteResponderOf;

/// The responder used in RaftCore.
///
/// RaftCore use this responder to send response to the caller.
/// It is either an oneshot responder or a user-defined responder.
/// It is either a progress responder or a user-defined responder.
pub(crate) enum CoreResponder<C>
where C: RaftTypeConfig
{
Oneshot(OneshotResponder<C, ClientWriteResult<C>>),
Progress(ProgressResponder<C, ClientWriteResult<C>>),
UserDefined(WriteResponderOf<C>),
}

impl<C> Responder<ClientWriteResult<C>> for CoreResponder<C>
impl<C> Responder<C, ClientWriteResult<C>> for CoreResponder<C>
where C: RaftTypeConfig
{
fn send(self, res: ClientWriteResult<C>) {
fn on_commit(&mut self, log_id: LogId<C>) {
match self {
Self::Oneshot(responder) => responder.send(res),
Self::UserDefined(responder) => responder.send(res),
Self::Progress(responder) => responder.on_commit(log_id),
Self::UserDefined(responder) => responder.on_commit(log_id),
}
}

fn on_complete(self, res: ClientWriteResult<C>) {
match self {
Self::Progress(responder) => responder.on_complete(res),
Self::UserDefined(responder) => responder.on_complete(res),
}
}
}
5 changes: 5 additions & 0 deletions openraft/src/raft/responder/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod oneshot_responder;
mod progress_responder;

pub use oneshot_responder::OneshotResponder;
pub use progress_responder::ProgressResponder;
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ where
}
}

impl<C, T> Responder<T> for OneshotResponder<C, T>
impl<C, T> Responder<C, T> for OneshotResponder<C, T>
where
C: RaftTypeConfig,
T: OptionalSend + 'static,
{
fn send(self, res: T) {
fn on_complete(self, res: T) {
let res = self.tx.send(res);

if res.is_ok() {
Expand Down
Loading