diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index dac51c62b..25b26ef99 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -446,8 +446,20 @@ impl DownstairsClient { self.client_id ); }; - assert_eq!(state.discriminant(), NegotiationState::WaitQuorum); - assert_eq!(mode, &ConnectionMode::New); + // There are two cases where reconciliation is allowed: either from a + // new connection, or if all three Downstairs need live-repair + // simultaneously. + match (state.discriminant(), &mode) { + (NegotiationState::WaitQuorum, ConnectionMode::New) => { + // This is fine. + } + (NegotiationState::LiveRepairReady, ConnectionMode::Faulted) => { + // This is also fine, but we need to tweak our connection mode + // because we're no longer doing live-repair. + *mode = ConnectionMode::New; + } + s => panic!("invalid (state, mode) tuple: ({s:?}"), + } *state = NegotiationStateData::Reconcile; } @@ -894,6 +906,20 @@ impl DownstairsClient { }, ) => true, + // Special case: LiveRepairReady is allowed to jump sideways into + // reconciliation if all three downstairs require live-repair + // (because otherwise we have no-one to repair from) + ( + DsStateData::Connecting { + state: NegotiationStateData::LiveRepairReady(..), + mode: ConnectionMode::Faulted, + }, + DsStateData::Connecting { + state: NegotiationStateData::Reconcile, + mode: ConnectionMode::New, + }, + ) => true, + // Check normal negotiation path ( DsStateData::Connecting { @@ -926,7 +952,7 @@ impl DownstairsClient { ConnectionMode::Offline ) | (NegotiationStateData::Reconcile, ConnectionMode::New) | ( - NegotiationStateData::LiveRepairReady, + NegotiationStateData::LiveRepairReady(..), ConnectionMode::Faulted | ConnectionMode::Replaced ) ) @@ -938,7 +964,7 @@ impl DownstairsClient { matches!( (state, mode), ( - NegotiationStateData::LiveRepairReady, + NegotiationStateData::LiveRepairReady(..), ConnectionMode::Faulted | ConnectionMode::Replaced ) ) @@ -1212,7 +1238,7 @@ impl DownstairsClient { let DsStateData::Connecting { state, .. } = &self.state else { return; }; - if matches!(state, NegotiationStateData::LiveRepairReady) { + if matches!(state, NegotiationStateData::LiveRepairReady(..)) { assert!(self.cfg.read_only); // TODO: could we do this transition early, by automatically @@ -1230,7 +1256,7 @@ impl DownstairsClient { let DsStateData::Connecting { state, .. } = &self.state else { panic!("invalid state"); }; - assert!(matches!(state, NegotiationStateData::LiveRepairReady)); + assert!(matches!(state, NegotiationStateData::LiveRepairReady(..))); self.checked_state_transition(up_state, DsStateData::LiveRepair); } @@ -1574,7 +1600,7 @@ impl DownstairsClient { } ConnectionMode::Faulted | ConnectionMode::Replaced => { - *state = NegotiationStateData::LiveRepairReady; + *state = NegotiationStateData::LiveRepairReady(dsr); NegotiationResult::LiveRepair } ConnectionMode::Offline => { @@ -1756,10 +1782,10 @@ impl DownstairsClient { /// │ │ New │ Faulted / Replaced /// │ ┌──────▼───┐ ┌────▼──────────┐ /// │ │WaitQuorum│ │LiveRepairReady│ -/// │ └────┬─────┘ └────┬──────────┘ -/// │ │ │ -/// │ ┌────▼────┐ │ -/// │ │Reconcile│ │ +/// │ └────┬─────┘ └─┬──┬──────────┘ +/// │ │ │ │ +/// │ ┌────▼────┐ │ │ +/// │ │Reconcile◄───────┘ │ /// │ └────┬────┘ │ /// │ │ │ /// │ ┌───▼──┐ │ @@ -1803,7 +1829,9 @@ pub enum NegotiationStateData { Reconcile, /// Waiting for live-repair to begin - LiveRepairReady, + // This state includes [`RegionMetadata`], because if all three Downstairs + // end up in `LiveRepairReady`, we have to perform reconciliation instead. + LiveRepairReady(RegionMetadata), } impl NegotiationStateData { @@ -1842,7 +1870,7 @@ impl NegotiationStateData { ConnectionMode::New ) | ( NegotiationStateData::GetExtentVersions, - NegotiationStateData::LiveRepairReady, + NegotiationStateData::LiveRepairReady(..), ConnectionMode::Faulted | ConnectionMode::Replaced, ) ) diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 443316d68..c4fad6c3c 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -25,8 +25,8 @@ use crate::{ DownstairsIO, DownstairsMend, DsState, DsStateData, ExtentFix, ExtentRepairIDs, IOState, IOStateCount, IOop, ImpactedBlocks, JobId, Message, NegotiationState, RawReadResponse, RawWrite, ReconcileIO, - ReconciliationId, RegionDefinition, ReplaceResult, SnapshotDetails, - WorkSummary, + ReconciliationId, RegionDefinition, RegionMetadata, ReplaceResult, + SnapshotDetails, WorkSummary, }; use crucible_common::{BlockIndex, ExtentId, NegotiationError}; use crucible_protocol::WriteHeader; @@ -165,6 +165,20 @@ pub(crate) enum LiveRepairState { }, } +#[derive(Copy, Clone, Debug, PartialEq)] +pub(crate) enum LiveRepairStart { + /// Live-repair has started + Started, + /// Live-repair is already running + AlreadyRunning, + /// No downstairs is in `LiveRepairReady` + NotNeeded, + /// All three downstairs need live-repair (oh no) + AllNeedRepair, + /// There is no source downstairs available + NoSource, +} + impl LiveRepairState { fn dummy() -> Self { LiveRepairState::Noop { @@ -874,7 +888,16 @@ impl Downstairs { /// Returns `true` if repair is needed, `false` otherwise pub(crate) fn collate(&mut self) -> Result { let r = self.check_region_metadata()?; - Ok(self.start_reconciliation(r)) + Ok(self.start_reconciliation(r, |data| { + let DsStateData::Connecting { + state: NegotiationStateData::WaitQuorum(r), + .. + } = data + else { + panic!("client is not in WaitQuorum"); + }; + r + })) } /// Checks that region metadata is valid @@ -963,9 +986,46 @@ impl Downstairs { Ok(CollateData { max_flush, max_gen }) } + /// Begins reconciliation from all downstairs in `LiveRepairReady` + /// + /// # Panics + /// If any of the downstairs is not in `LiveRepairReady` + #[must_use] + pub(crate) fn reconcile_from_live_repair_ready(&mut self) -> bool { + let mut max_flush = 0; + let mut max_gen = 0; + for client in self.clients.iter_mut() { + let DsStateData::Connecting { + state: NegotiationStateData::LiveRepairReady(data), + .. + } = client.state_data() + else { + panic!("got invalid client state"); + }; + for m in data.iter() { + max_flush = max_flush.max(m.flush + 1); + max_gen = max_gen.max(m.gen + 1); + } + } + self.start_reconciliation(CollateData { max_gen, max_flush }, |data| { + let DsStateData::Connecting { + state: NegotiationStateData::LiveRepairReady(r), + .. + } = data + else { + panic!("client is not in LiveRepairReady"); + }; + r + }) + } + /// Begins reconciliation, using the given collation data #[must_use] - fn start_reconciliation(&mut self, data: CollateData) -> bool { + fn start_reconciliation &RegionMetadata>( + &mut self, + data: CollateData, + getter: G, + ) -> bool { let CollateData { max_flush, max_gen } = data; /* @@ -978,7 +1038,7 @@ impl Downstairs { * Determine what extents don't match and what to do * about that */ - if let Some(reconcile_list) = self.mismatch_list() { + if let Some(reconcile_list) = self.mismatch_list(getter) { for c in self.clients.iter_mut() { c.begin_reconcile(); } @@ -1026,10 +1086,14 @@ impl Downstairs { /// /// This function is idempotent; it returns without doing anything if /// live-repair either can't be started or is already running. - pub(crate) fn check_live_repair_start(&mut self, up_state: &UpstairsState) { + #[must_use] + pub(crate) fn check_live_repair_start( + &mut self, + up_state: &UpstairsState, + ) -> LiveRepairStart { // If we're already doing live-repair, then we can't start live-repair if self.live_repair_in_progress() { - return; + return LiveRepairStart::AlreadyRunning; } // Begin setting up live-repair state @@ -1054,16 +1118,15 @@ impl Downstairs { // Can't start live-repair if no one is LiveRepairReady if repair_downstairs.is_empty() { - return; + return LiveRepairStart::NotNeeded; + } else if repair_downstairs.len() == 3 { + warn!(self.log, "All three downstairs require repair"); + return LiveRepairStart::AllNeedRepair; } // Can't start live-repair if we don't have a source downstairs let Some(source_downstairs) = source_downstairs else { - warn!(self.log, "No source, no Live Repair possible"); - if repair_downstairs.len() == 3 { - warn!(self.log, "All three downstairs require repair"); - } - return; + return LiveRepairStart::NoSource; }; // Move the upstairs that were LiveRepairReady to LiveRepair @@ -1105,6 +1168,8 @@ impl Downstairs { let repair = self.repair.as_ref().unwrap(); self.notify_live_repair_start(repair); + + LiveRepairStart::Started } /// Checks whether live-repair can continue @@ -2008,18 +2073,14 @@ impl Downstairs { } /// Compares region metadata from all three clients and builds a mend list - fn mismatch_list(&self) -> Option { + fn mismatch_list &RegionMetadata>( + &self, + getter: G, + ) -> Option { let log = self.log.new(o!("" => "mend".to_string())); let mut meta = ClientMap::new(); for i in ClientId::iter() { - let DsStateData::Connecting { - state: NegotiationStateData::WaitQuorum(r), - .. - } = self.clients[i].state_data() - else { - panic!("client {i} is not in WaitQuorum"); - }; - meta.insert(i, r); + meta.insert(i, getter(self.clients[i].state_data())); } DownstairsMend::new(&meta, log) } @@ -4006,7 +4067,8 @@ struct DownstairsBackpressureConfig { pub(crate) mod test { use super::{ ClientFaultReason, ClientNegotiationFailed, ClientStopReason, - ConnectionMode, Downstairs, DsState, NegotiationStateData, PendingJob, + ConnectionMode, Downstairs, DsState, LiveRepairStart, + NegotiationStateData, PendingJob, }; use crate::{ downstairs::{LiveRepairData, LiveRepairState, ReconcileData}, @@ -4081,7 +4143,7 @@ pub(crate) mod test { NegotiationStateData::WaitForPromote, NegotiationStateData::WaitForRegionInfo, NegotiationStateData::GetExtentVersions, - NegotiationStateData::LiveRepairReady, + NegotiationStateData::LiveRepairReady(Default::default()), ] { ds.clients[to_repair].checked_state_transition( &UpstairsState::Active, @@ -9613,7 +9675,8 @@ pub(crate) mod test { // Start the repair normally. This enqueues the close & reopen jobs, and // reserves Job IDs for the repair/noop - ds.check_live_repair_start(&UpstairsState::Active); + let r = ds.check_live_repair_start(&UpstairsState::Active); + assert_eq!(r, LiveRepairStart::Started); assert!(ds.live_repair_in_progress()); // Submit a write. diff --git a/upstairs/src/live_repair.rs b/upstairs/src/live_repair.rs index ff79ddfd0..a5e7e2ab3 100644 --- a/upstairs/src/live_repair.rs +++ b/upstairs/src/live_repair.rs @@ -1113,7 +1113,7 @@ pub mod repair_test { // Fault and start live-repair for client 1 to_live_repair_ready(&mut up, ClientId::new(1)); - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); assert!(up.downstairs.live_repair_in_progress()); assert_eq!(up.downstairs.last_repair_extent(), Some(ExtentId(0))); diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 06d0fa507..a289fae75 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -11,7 +11,7 @@ use crate::{ DeferredBlockOp, DeferredMessage, DeferredQueue, DeferredRead, DeferredWrite, EncryptedWrite, }, - downstairs::{Downstairs, DownstairsAction}, + downstairs::{Downstairs, DownstairsAction, LiveRepairStart}, extent_from_offset, io_limits::IOLimitGuard, stats::UpStatOuter, @@ -614,8 +614,8 @@ impl Upstairs { } } - // Check whether we need to start live-repair - self.check_live_repair_start(); + // Check whether we need to start live-repair or reconciliation + self.ensure_downstairs_consistency(); // Check whether we need to mark an offline Downstairs as faulted // because too many jobs have piled up. @@ -906,7 +906,10 @@ impl Upstairs { /// any Downstairs from /// `DsStateData::Connecting { state: NegotiationStateData::LiveRepairReady, .. }` /// back to [DsStateData::Active] without actually performing any repair. - pub(crate) fn check_live_repair_start(&mut self) { + /// + /// If all Downstairs are in `LiveRepairReady`, we instead begin + /// reconciliation. + pub(crate) fn ensure_downstairs_consistency(&mut self) { if !matches!(self.state, UpstairsState::Active) { return; } @@ -921,8 +924,26 @@ impl Upstairs { return; } - // Try to start live-repair - self.downstairs.check_live_repair_start(&self.state); + // Try to start live-repair; fall back to reconciliation if necessary + match self.downstairs.check_live_repair_start(&self.state) { + LiveRepairStart::AllNeedRepair => { + info!( + self.log, + "all Downstairs need live-repair; doing reconciliation" + ); + if self.downstairs.reconcile_from_live_repair_ready() { + self.downstairs.send_next_reconciliation_req(); + } else { + self.on_reconciliation_done(false); + } + } + LiveRepairStart::Started + | LiveRepairStart::AlreadyRunning + | LiveRepairStart::NotNeeded + | LiveRepairStart::NoSource => { + // We don't need any special handling of these cases + } + } } /// Returns `true` if we're ready to accept guest IO @@ -2231,7 +2252,7 @@ pub(crate) mod test { client::{ClientFaultReason, ClientStopReason}, test::{make_encrypted_upstairs, make_upstairs}, Block, BlockOp, BlockOpWaiter, DsStateData, JobId, - NegotiationStateData, + NegotiationStateData, RegionMetadata, }; use bytes::BytesMut; use crucible_common::integrity_hash; @@ -2267,8 +2288,8 @@ pub(crate) mod test { // Move our downstairs client fail_id to LiveRepair. to_live_repair_ready(&mut up, or_ds); - // Assert that the repair started - up.check_live_repair_start(); + // Assert that a consistency check starts the repair + up.ensure_downstairs_consistency(); assert!(up.downstairs.live_repair_in_progress()); // The first thing that should happen after we start repair_extent @@ -2282,6 +2303,11 @@ pub(crate) mod test { /// Helper function to legally move the given client to live-repair ready pub(crate) fn to_live_repair_ready(up: &mut Upstairs, to_repair: ClientId) { + active_to_faulted(up, to_repair); + faulted_to_live_repair_ready(up, to_repair); + } + + fn active_to_faulted(up: &mut Upstairs, to_repair: ClientId) { up.downstairs.fault_client( to_repair, &UpstairsState::Active, @@ -2292,12 +2318,23 @@ pub(crate) mod test { client_id: to_repair, action: ClientAction::TaskStopped(ClientRunResult::RequestedStop), })); + } + + fn faulted_to_live_repair_ready(up: &mut Upstairs, to_repair: ClientId) { + faulted_to_live_repair_ready_with(up, to_repair, Default::default()); + } + + fn faulted_to_live_repair_ready_with( + up: &mut Upstairs, + to_repair: ClientId, + meta: RegionMetadata, + ) { let mode = ConnectionMode::Faulted; for state in [ NegotiationStateData::WaitForPromote, NegotiationStateData::WaitForRegionInfo, NegotiationStateData::GetExtentVersions, - NegotiationStateData::LiveRepairReady, + NegotiationStateData::LiveRepairReady(meta), ] { up.downstairs.clients[to_repair].checked_state_transition( &up.state, @@ -3611,13 +3648,13 @@ pub(crate) mod test { // Before we are active, we have no need to repair or check for future // repairs. - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); assert!(!up.downstairs.live_repair_in_progress()); up.force_active().unwrap(); // No need to repair or check for future repairs here either - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); assert!(!up.downstairs.live_repair_in_progress()); // No downstairs should change state. @@ -3640,7 +3677,7 @@ pub(crate) mod test { // Force client 1 into LiveRepairReady to_live_repair_ready(&mut up, ClientId::new(1)); - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); assert!(up.downstairs.live_repair_in_progress()); assert_eq!(up.ds_state(ClientId::new(1)), DsState::LiveRepair); assert!(up.downstairs.repair().is_some()); @@ -3659,8 +3696,8 @@ pub(crate) mod test { to_live_repair_ready(&mut up, ClientId::new(1)); up.ds_transition(ClientId::new(1), DsStateData::LiveRepair); - // Start the live-repair - up.check_live_repair_start(); + // Check for downstairs consistency, which starts the live-repair + up.ensure_downstairs_consistency(); assert!(up.downstairs.live_repair_in_progress()); // Pretend that DS 0 faulted then came back through to LiveRepairReady; @@ -3668,7 +3705,7 @@ pub(crate) mod test { // repair_check_deadline to check again in the future. to_live_repair_ready(&mut up, ClientId::new(0)); - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); assert!(up.downstairs.live_repair_in_progress()); } @@ -3683,11 +3720,11 @@ pub(crate) mod test { up.force_active().unwrap(); to_live_repair_ready(&mut up, ClientId::new(1)); - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); assert!(up.downstairs.live_repair_in_progress()); // Checking again is idempotent - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); assert!(up.downstairs.live_repair_in_progress()); } @@ -4648,7 +4685,7 @@ pub(crate) mod test { assert!(!up.downstairs.live_repair_in_progress()); // Trigger live repair start - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); // Verify live repair has started assert!(up.downstairs.live_repair_in_progress()); @@ -4692,7 +4729,7 @@ pub(crate) mod test { // Put client 1 into LiveRepairReady and start live repair to_live_repair_ready(&mut up, ClientId::new(1)); - up.check_live_repair_start(); + up.ensure_downstairs_consistency(); // Verify live repair started assert!(up.downstairs.live_repair_in_progress()); @@ -4718,4 +4755,68 @@ pub(crate) mod test { } ); } + + #[test] + fn test_downstairs_three_live_repair() { + // Start with all three downstairs active. Put all three into + // live-repair (oh no), then see what happens. + + let mut ddef = RegionDefinition::default(); + ddef.set_block_size(512); + ddef.set_extent_size(Block::new_512(3)); + ddef.set_extent_count(4); + + let mut up = Upstairs::test_default(Some(ddef), false); + up.force_active().unwrap(); + + // All three clients start active + assert_eq!(up.ds_state(ClientId::new(0)), DsState::Active); + assert_eq!(up.ds_state(ClientId::new(1)), DsState::Active); + assert_eq!(up.ds_state(ClientId::new(2)), DsState::Active); + + // Put all clients in live-repair. For the purposes of this test, we + // fault each downstairs first, so that live-repair doesn't start midway + // through the faulting. + for id in ClientId::iter() { + active_to_faulted(&mut up, id); + } + + // Set up our region metadata to indicate that one client is dirty + // This doesn't trigger anything, because we're poking the state machine + // internals (instead of sending it events). + for id in ClientId::iter() { + faulted_to_live_repair_ready_with( + &mut up, + id, + RegionMetadata::new( + &[1; 12], // generation + &[1; 12], // flush + &[id == ClientId::new(0); 12], // dirty + ), + ); + up.downstairs.clients[id].repair_addr = + Some("0.0.0.0:1".parse().unwrap()); + } + + // Send it an event, which should trigger the beginning of + // reconciliation (because it will now notice that all three downstairs + // are in live-repair) + up.apply(UpstairsAction::NoOp); + + // Check that we're doing reconciliation + for id in ClientId::iter() { + assert_eq!( + up.ds_state(id), + DsState::Connecting { + mode: ConnectionMode::New, + state: NegotiationState::Reconcile, + } + ); + } + // Each extent has 4 associated repair jobs + let expected_repairs = ddef.extent_count() as usize + * ddef.extent_size().value as usize + * 4; + assert_eq!(up.downstairs.reconcile_repair_needed(), expected_repairs); + } }