Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions trust-quorum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ gfss.workspace = true
hex.workspace = true
hkdf.workspace = true
iddqd.workspace = true
omicron-common.workspace = true
omicron-uuid-kinds.workspace = true
rand = { workspace = true, features = ["os_rng"] }
secrecy.workspace = true
Expand Down
82 changes: 73 additions & 9 deletions trust-quorum/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl ConnMgr {
self.on_task_exit(task_id).await;
}
Err(err) => {
error!(self.log, "Connection task panic: {err}");
warn!(self.log, "Connection task panic: {err}");
self.on_task_exit(err.id()).await;
}

Expand Down Expand Up @@ -482,7 +482,19 @@ impl ConnMgr {
tx,
conn_type: ConnectionType::Accepted(addr),
};
assert!(self.accepting.insert_unique(task_handle).is_ok());
let replaced = self.accepting.insert_overwrite(task_handle);
for h in replaced {
// We accepted a connection from the same `SocketAddrV6` before the
// old one was torn down. This should be rare, if not impossible.
warn!(
self.log,
"Accepted connection replaced. Aborting old task.";
"task_id" => ?h.task_id(),
"peer_addr" => %h.addr(),
);
h.abort();
}

Ok(())
}

Expand All @@ -501,12 +513,38 @@ impl ConnMgr {
"peer_id" => %peer_id
);

let already_established = self.established.insert_unique(
let replaced = self.established.insert_overwrite(
EstablishedTaskHandle::new(peer_id, task_handle),
);
assert!(already_established.is_ok());

// The only reason for for established connections to be replaced
// like this is when the IP address for a peer changes, but the
// previous connection has not yet been torn down.
//
// Tear down usually happens quickly due to TCP reset or missed
// pings. However if the new ip address is fed into the task via
// `load_peer_addresses` and the peer at that address connects
// before the old connection is torn down, you end up in this
// situation.
//
// This isn't really possible, except in tests where we change port
// numbers when simulating crash and restart of nodes, and do this
// very quickly. We change port numbers because `NodeTask`s listen
// on port 0 and use ephemeral ports to prevent collisions in tests
// where the IP address is localhost.
for h in replaced {
warn!(
self.log,
"Established connection replaced. Aborting old task.";
"task_id" => ?h.task_id(),
"peer_addr" => %h.addr(),
"peer_id" => %h.baseboard_id
);
h.abort();
}
} else {
error!(self.log, "Server handshake completed, but no server addr in map";
warn!(self.log,
"Server handshake completed, but no server addr in map";
"task_id" => ?task_id,
"peer_addr" => %addr,
"peer_id" => %peer_id
Expand All @@ -528,12 +566,38 @@ impl ConnMgr {
"peer_addr" => %addr,
"peer_id" => %peer_id
);
let already_established = self.established.insert_unique(
let replaced = self.established.insert_overwrite(
EstablishedTaskHandle::new(peer_id, task_handle),
);
assert!(already_established.is_ok());

// The only reason for for established connections to be replaced
// like this is when the IP address for a peer changes, but the
// previous connection has not yet been torn down.
//
// Tear down usually happens quickly due to TCP reset or missed
// pings. However if the new ip address is fed into the task via
// `load_peer_addresses` and the peer at that address connects
// before the old connection is torn down, you end up in this
// situation.
//
// This isn't really possible, except in tests where we change port
// numbers when simulating crash and restart of nodes, and do this
// very quickly. We change port numbers because `NodeTask`s listen
// on port 0 and use ephemeral ports to prevent collisions in tests
// where the IP address is localhost.
for h in replaced {
warn!(
self.log,
"Established connection replaced. Aborting old task.";
"task_id" => ?h.task_id(),
"peer_addr" => %h.addr(),
"peer_id" => %h.baseboard_id
);
h.abort();
}
} else {
error!(self.log, "Client handshake completed, but no client addr in map";
warn!(self.log,
"Client handshake completed, but no client addr in map";
"task_id" => ?task_id,
"peer_addr" => %addr,
"peer_id" => %peer_id
Expand Down Expand Up @@ -634,7 +698,7 @@ impl ConnMgr {
disconnected_peers
}

/// Spawn a task to estalbish a sprockets connection for the given address
/// Spawn a task to establish a sprockets connection for the given address
async fn connect_client(
&mut self,
corpus: Vec<Utf8PathBuf>,
Expand Down
91 changes: 91 additions & 0 deletions trust-quorum/src/ledgers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Persistent storage for the trust quorum task
//!
//! We write two pieces of data to M.2 devices in production via
//! [`omicron_common::ledger::Ledger`]:
//!
//! 1. [`trust_quorum_protocol::PersistentState`] for trust quorum state
//! 2. A network config blob required for pre-rack-unlock configuration

use camino::Utf8PathBuf;
use omicron_common::ledger::{Ledger, Ledgerable};
use serde::{Deserialize, Serialize};
use slog::{Logger, info};
use trust_quorum_protocol::PersistentState;

/// A wrapper type around [`PersistentState`] for use as a [`Ledger`]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PersistentStateLedger {
pub generation: u64,
pub state: PersistentState,
}

impl Ledgerable for PersistentStateLedger {
fn is_newer_than(&self, other: &Self) -> bool {
self.generation > other.generation
}

fn generation_bump(&mut self) {
self.generation += 1;
}
}

impl PersistentStateLedger {
/// Save the persistent state to a ledger and return the new generation
/// number.
///
/// Panics if the ledger cannot be saved.
///
/// The trust quorum protocol relies on persisting state to disk, such
/// as whether a node has prepared or committed a configuration, before
/// responding to a coordinator node or Nexus. This is necessary in order
/// to ensure that enough nodes actually have performed an operation and
/// not have the overall state of the protocol go backward in the case of
/// a crash and restart of a node. In this manner, trust quorum is similar
/// to consensus protocols like Raft and Paxos.
///
/// If for any reason we cannot persist trust quorum state to the ledger,
/// we must panic to ensure that the node does not take any further
/// action incorrectly, like acknowledging a `Prepare` to a coordinator.
/// Panicking is the simplest mechanism to ensure that a given node will
/// not violate the invariants of the trust quorum protocol in the case
/// of internal disk failures. It also ensures a very obvious failure that
/// will allow support to get involved and replace internal disks.
pub async fn save(
log: &Logger,
paths: Vec<Utf8PathBuf>,
generation: u64,
state: PersistentState,
) -> u64 {
let persistent_state = PersistentStateLedger { generation, state };
let mut ledger = Ledger::new_with(log, paths, persistent_state);
ledger
.commit()
.await
.expect("Critical: Failed to save ledger for persistent state");
ledger.data().generation
}

/// Return Some(`PersistentStateLedger`) if it exists on disk, otherwise
/// return `None`.
pub async fn load(
log: &Logger,
paths: Vec<Utf8PathBuf>,
) -> Option<PersistentStateLedger> {
let Some(ledger) =
Ledger::<PersistentStateLedger>::new(&log, paths).await
else {
return None;
};
let persistent_state = ledger.into_inner();
info!(
log,
"Loaded persistent state from ledger with generation {}",
persistent_state.generation
);
Some(persistent_state)
}
}
1 change: 1 addition & 0 deletions trust-quorum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

mod connection_manager;
pub(crate) mod established_conn;
mod ledgers;
mod task;

pub(crate) use connection_manager::{
Expand Down
Loading
Loading