Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,20 @@ impl DownstairsClient {
self.client_id
);
};
assert_eq!(state.discriminant(), NegotiationState::WaitQuorum);
assert_eq!(mode, &ConnectionMode::New);
// There are two cases where reconciliation is allowed: either from a
// new connection, or if all three Downstairs need live-repair
// simultaneously.
match (state.discriminant(), &mode) {
(NegotiationState::WaitQuorum, ConnectionMode::New) => {
// This is fine.
}
(NegotiationState::LiveRepairReady, ConnectionMode::Faulted) => {
// This is also fine, but we need to tweak our connection mode
// because we're no longer doing live-repair.
*mode = ConnectionMode::New;
}
s => panic!("invalid (state, mode) tuple: ({s:?}"),
}
*state = NegotiationStateData::Reconcile;
}

Expand Down Expand Up @@ -894,6 +906,20 @@ impl DownstairsClient {
},
) => true,

// Special case: LiveRepairReady is allowed to jump sideways into
// reconciliation if all three downstairs require live-repair
// (because otherwise we have no-one to repair from)
(
DsStateData::Connecting {
state: NegotiationStateData::LiveRepairReady(..),
mode: ConnectionMode::Faulted,
},
DsStateData::Connecting {
state: NegotiationStateData::Reconcile,
mode: ConnectionMode::New,
},
) => true,

// Check normal negotiation path
(
DsStateData::Connecting {
Expand Down Expand Up @@ -926,7 +952,7 @@ impl DownstairsClient {
ConnectionMode::Offline
) | (NegotiationStateData::Reconcile, ConnectionMode::New)
| (
NegotiationStateData::LiveRepairReady,
NegotiationStateData::LiveRepairReady(..),
ConnectionMode::Faulted | ConnectionMode::Replaced
)
)
Expand All @@ -938,7 +964,7 @@ impl DownstairsClient {
matches!(
(state, mode),
(
NegotiationStateData::LiveRepairReady,
NegotiationStateData::LiveRepairReady(..),
ConnectionMode::Faulted | ConnectionMode::Replaced
)
)
Expand Down Expand Up @@ -1212,7 +1238,7 @@ impl DownstairsClient {
let DsStateData::Connecting { state, .. } = &self.state else {
return;
};
if matches!(state, NegotiationStateData::LiveRepairReady) {
if matches!(state, NegotiationStateData::LiveRepairReady(..)) {
assert!(self.cfg.read_only);

// TODO: could we do this transition early, by automatically
Expand All @@ -1230,7 +1256,7 @@ impl DownstairsClient {
let DsStateData::Connecting { state, .. } = &self.state else {
panic!("invalid state");
};
assert!(matches!(state, NegotiationStateData::LiveRepairReady));
assert!(matches!(state, NegotiationStateData::LiveRepairReady(..)));
self.checked_state_transition(up_state, DsStateData::LiveRepair);
}

Expand Down Expand Up @@ -1574,7 +1600,7 @@ impl DownstairsClient {
}

ConnectionMode::Faulted | ConnectionMode::Replaced => {
*state = NegotiationStateData::LiveRepairReady;
*state = NegotiationStateData::LiveRepairReady(dsr);
NegotiationResult::LiveRepair
}
ConnectionMode::Offline => {
Expand Down Expand Up @@ -1756,10 +1782,10 @@ impl DownstairsClient {
/// │ │ New │ Faulted / Replaced
/// │ ┌──────▼───┐ ┌────▼──────────┐
/// │ │WaitQuorum│ │LiveRepairReady│
/// │ └────┬─────┘ └───┬──────────┘
/// │ │
/// │ ┌────▼────┐
/// │ │Reconcile
/// │ └────┬─────┘ └───┬──────────┘
/// │ │
/// │ ┌────▼────┐
/// │ │Reconcile◄───────┘
/// │ └────┬────┘ │
/// │ │ │
/// │ ┌───▼──┐ │
Expand Down Expand Up @@ -1803,7 +1829,9 @@ pub enum NegotiationStateData {
Reconcile,

/// Waiting for live-repair to begin
LiveRepairReady,
// This state includes [`RegionMetadata`], because if all three Downstairs
// end up in `LiveRepairReady`, we have to perform reconciliation instead.
LiveRepairReady(RegionMetadata),
}

impl NegotiationStateData {
Expand Down Expand Up @@ -1842,7 +1870,7 @@ impl NegotiationStateData {
ConnectionMode::New
) | (
NegotiationStateData::GetExtentVersions,
NegotiationStateData::LiveRepairReady,
NegotiationStateData::LiveRepairReady(..),
ConnectionMode::Faulted | ConnectionMode::Replaced,
)
)
Expand Down
113 changes: 88 additions & 25 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::{
DownstairsIO, DownstairsMend, DsState, DsStateData, ExtentFix,
ExtentRepairIDs, IOState, IOStateCount, IOop, ImpactedBlocks, JobId,
Message, NegotiationState, RawReadResponse, RawWrite, ReconcileIO,
ReconciliationId, RegionDefinition, ReplaceResult, SnapshotDetails,
WorkSummary,
ReconciliationId, RegionDefinition, RegionMetadata, ReplaceResult,
SnapshotDetails, WorkSummary,
};
use crucible_common::{BlockIndex, ExtentId, NegotiationError};
use crucible_protocol::WriteHeader;
Expand Down Expand Up @@ -165,6 +165,20 @@ pub(crate) enum LiveRepairState {
},
}

#[derive(Copy, Clone, Debug, PartialEq)]
pub(crate) enum LiveRepairStart {
/// Live-repair has started
Started,
/// Live-repair is already running
AlreadyRunning,
/// No downstairs is in `LiveRepairReady`
NotNeeded,
/// All three downstairs need live-repair (oh no)
AllNeedRepair,
/// There is no source downstairs available
NoSource,
}

impl LiveRepairState {
fn dummy() -> Self {
LiveRepairState::Noop {
Expand Down Expand Up @@ -874,7 +888,16 @@ impl Downstairs {
/// Returns `true` if repair is needed, `false` otherwise
pub(crate) fn collate(&mut self) -> Result<bool, NegotiationError> {
let r = self.check_region_metadata()?;
Ok(self.start_reconciliation(r))
Ok(self.start_reconciliation(r, |data| {
let DsStateData::Connecting {
state: NegotiationStateData::WaitQuorum(r),
..
} = data
else {
panic!("client is not in WaitQuorum");
};
r
}))
}

/// Checks that region metadata is valid
Expand Down Expand Up @@ -963,9 +986,46 @@ impl Downstairs {
Ok(CollateData { max_flush, max_gen })
}

/// Begins reconciliation from all downstairs in `LiveRepairReady`
///
/// # Panics
/// If any of the downstairs is not in `LiveRepairReady`
#[must_use]
pub(crate) fn reconcile_from_live_repair_ready(&mut self) -> bool {
let mut max_flush = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to want a stat for DTrace when we do this. Just to see how often it happens. I'm happy to add that myself in a later PR (as I'll be updating the DTrace scripts to print it)

let mut max_gen = 0;
for client in self.clients.iter_mut() {
let DsStateData::Connecting {
state: NegotiationStateData::LiveRepairReady(data),
..
} = client.state_data()
else {
panic!("got invalid client state");
};
for m in data.iter() {
max_flush = max_flush.max(m.flush + 1);
max_gen = max_gen.max(m.gen + 1);
}
}
self.start_reconciliation(CollateData { max_gen, max_flush }, |data| {
let DsStateData::Connecting {
state: NegotiationStateData::LiveRepairReady(r),
..
} = data
else {
panic!("client is not in LiveRepairReady");
};
r
})
}

/// Begins reconciliation, using the given collation data
#[must_use]
fn start_reconciliation(&mut self, data: CollateData) -> bool {
fn start_reconciliation<G: Fn(&DsStateData) -> &RegionMetadata>(
&mut self,
data: CollateData,
getter: G,
) -> bool {
let CollateData { max_flush, max_gen } = data;

/*
Expand All @@ -978,7 +1038,7 @@ impl Downstairs {
* Determine what extents don't match and what to do
* about that
*/
if let Some(reconcile_list) = self.mismatch_list() {
if let Some(reconcile_list) = self.mismatch_list(getter) {
for c in self.clients.iter_mut() {
c.begin_reconcile();
}
Expand Down Expand Up @@ -1026,10 +1086,14 @@ impl Downstairs {
///
/// This function is idempotent; it returns without doing anything if
/// live-repair either can't be started or is already running.
pub(crate) fn check_live_repair_start(&mut self, up_state: &UpstairsState) {
#[must_use]
pub(crate) fn check_live_repair_start(
&mut self,
up_state: &UpstairsState,
) -> LiveRepairStart {
// If we're already doing live-repair, then we can't start live-repair
if self.live_repair_in_progress() {
return;
return LiveRepairStart::AlreadyRunning;
}

// Begin setting up live-repair state
Expand All @@ -1054,16 +1118,15 @@ impl Downstairs {

// Can't start live-repair if no one is LiveRepairReady
if repair_downstairs.is_empty() {
return;
return LiveRepairStart::NotNeeded;
} else if repair_downstairs.len() == 3 {
warn!(self.log, "All three downstairs require repair");
return LiveRepairStart::AllNeedRepair;
}

// Can't start live-repair if we don't have a source downstairs
let Some(source_downstairs) = source_downstairs else {
warn!(self.log, "No source, no Live Repair possible");
if repair_downstairs.len() == 3 {
warn!(self.log, "All three downstairs require repair");
}
return;
return LiveRepairStart::NoSource;
};

// Move the upstairs that were LiveRepairReady to LiveRepair
Expand Down Expand Up @@ -1105,6 +1168,8 @@ impl Downstairs {

let repair = self.repair.as_ref().unwrap();
self.notify_live_repair_start(repair);

LiveRepairStart::Started
}

/// Checks whether live-repair can continue
Expand Down Expand Up @@ -2008,18 +2073,14 @@ impl Downstairs {
}

/// Compares region metadata from all three clients and builds a mend list
fn mismatch_list(&self) -> Option<DownstairsMend> {
fn mismatch_list<G: Fn(&DsStateData) -> &RegionMetadata>(
&self,
getter: G,
) -> Option<DownstairsMend> {
let log = self.log.new(o!("" => "mend".to_string()));
let mut meta = ClientMap::new();
for i in ClientId::iter() {
let DsStateData::Connecting {
state: NegotiationStateData::WaitQuorum(r),
..
} = self.clients[i].state_data()
else {
panic!("client {i} is not in WaitQuorum");
};
meta.insert(i, r);
meta.insert(i, getter(self.clients[i].state_data()));
}
DownstairsMend::new(&meta, log)
}
Expand Down Expand Up @@ -4006,7 +4067,8 @@ struct DownstairsBackpressureConfig {
pub(crate) mod test {
use super::{
ClientFaultReason, ClientNegotiationFailed, ClientStopReason,
ConnectionMode, Downstairs, DsState, NegotiationStateData, PendingJob,
ConnectionMode, Downstairs, DsState, LiveRepairStart,
NegotiationStateData, PendingJob,
};
use crate::{
downstairs::{LiveRepairData, LiveRepairState, ReconcileData},
Expand Down Expand Up @@ -4081,7 +4143,7 @@ pub(crate) mod test {
NegotiationStateData::WaitForPromote,
NegotiationStateData::WaitForRegionInfo,
NegotiationStateData::GetExtentVersions,
NegotiationStateData::LiveRepairReady,
NegotiationStateData::LiveRepairReady(Default::default()),
] {
ds.clients[to_repair].checked_state_transition(
&UpstairsState::Active,
Expand Down Expand Up @@ -9613,7 +9675,8 @@ pub(crate) mod test {

// Start the repair normally. This enqueues the close & reopen jobs, and
// reserves Job IDs for the repair/noop
ds.check_live_repair_start(&UpstairsState::Active);
let r = ds.check_live_repair_start(&UpstairsState::Active);
assert_eq!(r, LiveRepairStart::Started);
assert!(ds.live_repair_in_progress());

// Submit a write.
Expand Down
2 changes: 1 addition & 1 deletion upstairs/src/live_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ pub mod repair_test {
// Fault and start live-repair for client 1
to_live_repair_ready(&mut up, ClientId::new(1));

up.check_live_repair_start();
up.ensure_downstairs_consistency();
assert!(up.downstairs.live_repair_in_progress());
assert_eq!(up.downstairs.last_repair_extent(), Some(ExtentId(0)));

Expand Down
Loading