diff --git a/config.example.toml b/config.example.toml index 3f4ff2b76..a64b4c1a8 100644 --- a/config.example.toml +++ b/config.example.toml @@ -121,6 +121,25 @@ basefee = 0 # Env: MBV_VALIDATOR__KEYPAIR keypair = "9Vo7TbA5YfC5a33JhAi9Fb41usA6JwecHNRw3f9MzzHAM8hFnXTzL5DcEHwsAFjuUZ8vNQcJ4XziRFpMc3gTgBQ" +# Replication role for this validator. +# Default: "standalone" +# Options: +# - "standalone": disable replication +# +# - "stand-by": replicate and allow takeover +# +# [validator.replication-mode.stand-by] +# url = "nats://0.0.0.0:4222" +# secret = "SUAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" +# +# - "replica-only": replicate without takeover +# Requires the primary validator pubkey we are replicating from. +# +# [validator.replication-mode.replica-only] +# url = "nats://0.0.0.0:4222" +# secret = "SUAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" +# authority-override = "L12m1UhkJRfHyvLMcVucJwxXeuD728EqVDDwQDxFMNo" + [aperture] # Network address to bind the main JSON RPC service to. # Websocket bind address is derived by incrementing the port diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 6f693be87..a6945e11b 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -142,7 +142,9 @@ impl MagicValidator { // Initialization // ----------------- #[instrument(skip_all, fields(last_slot = tracing::field::Empty))] - pub async fn try_from_config(config: ValidatorParams) -> ApiResult { + pub async fn try_from_config( + mut config: ValidatorParams, + ) -> ApiResult { // TODO(thlorenz): this will need to be recreated on each start let token = CancellationToken::new(); let identity_keypair = config.validator.keypair.insecure_clone(); @@ -189,23 +191,36 @@ impl MagicValidator { // Connect to replication broker if configured. // Returns (broker, is_fresh_start) where is_fresh_start indicates // whether accountsdb was empty and may need a snapshot. - let broker = - if let Some(url) = config.validator.replication_mode.remote() { - let mut broker = Broker::connect(url).await?; - let is_fresh_start = accountsdb.slot() == 0; - // Fetch snapshot from primary if starting fresh - if is_fresh_start { - if let Some(snapshot) = broker.get_snapshot().await? { - accountsdb.insert_external_snapshot( - snapshot.slot, - &snapshot.data, - )?; - } + let broker = if let Some(conf) = + config.validator.replication_mode.config() + { + let step_start = Instant::now(); + let mut broker = Broker::connect(conf.url, conf.secret).await?; + let is_fresh_start = accountsdb.slot() == 0; + // Fetch snapshot from primary if starting fresh + if is_fresh_start { + info!( + "accountsdb is not initialized, trying to fetch snapshot" + ); + if let Some(snapshot) = broker.get_snapshot().await? { + info!(slot = snapshot.slot, "fetched accountsdb snapshot"); + accountsdb.insert_external_snapshot( + snapshot.slot, + &snapshot.data, + )?; + // we have essentially reset the accountsdb, + // and chainlink should not prune it, as it + // would introduce divergence with primary node + config.accountsdb.reset = true; + } else { + warn!("no snapshot is found in replication stream"); } - Some((broker, is_fresh_start)) - } else { - None - }; + } + log_timing("startup", "replication broker init", step_start); + Some((broker, is_fresh_start)) + } else { + None + }; let accountsdb = Arc::new(accountsdb); let (mut dispatch, validator_channels) = link(); diff --git a/magicblock-config/src/config/validator.rs b/magicblock-config/src/config/validator.rs index 692ca88fe..013cddf15 100644 --- a/magicblock-config/src/config/validator.rs +++ b/magicblock-config/src/config/validator.rs @@ -1,4 +1,5 @@ -// src/config/validator.rs +use std::fmt; + use serde::{Deserialize, Serialize}; use solana_keypair::Keypair; use solana_pubkey::Pubkey; @@ -30,9 +31,50 @@ pub enum ReplicationMode { // Validator which doesn't participate in replication Standalone, /// Validator which participates in replication: acting as either a primary or replicator - StandBy(Url), + StandBy(ReplicationConfig), /// Validator which participates in replication only as replicator (no takeover) - ReplicaOnly(Url, SerdePubkey), + ReplicaOnly { + #[serde(flatten)] + config: ReplicationConfig, + #[serde(rename = "kebab-case")] + authority_override: SerdePubkey, + }, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct ReplicationConfig { + pub url: Url, + pub secret: String, +} + +impl fmt::Debug for ReplicationConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReplicationConfig") + .field("url", &self.url) + .field("secret", &"") + .finish() + } +} + +impl Serialize for ReplicationConfig { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + #[derive(Serialize)] + #[serde(rename_all = "kebab-case")] + struct Redacted<'a> { + url: &'a Url, + secret: &'static str, + } + + Redacted { + url: &self.url, + secret: "", + } + .serialize(serializer) + } } impl Default for ValidatorConfig { @@ -50,16 +92,20 @@ impl Default for ValidatorConfig { impl ReplicationMode { /// Returns the remote URL if this node participates in replication. /// Returns `None` for `Standalone` mode. - pub fn remote(&self) -> Option { + pub fn config(&self) -> Option { match self { Self::Standalone => None, - Self::StandBy(u) | Self::ReplicaOnly(u, _) => Some(u.clone()), + Self::StandBy(c) => Some(c.clone()), + Self::ReplicaOnly { config, .. } => Some(config.clone()), } } pub fn authority_override(&self) -> Option { - if let Self::ReplicaOnly(_, pk) = self { - return Some(pk.0); + if let Self::ReplicaOnly { + authority_override, .. + } = self + { + return Some(authority_override.0); } None } diff --git a/magicblock-config/src/tests.rs b/magicblock-config/src/tests.rs index d8c4b4976..5987e2500 100644 --- a/magicblock-config/src/tests.rs +++ b/magicblock-config/src/tests.rs @@ -6,7 +6,7 @@ use solana_keypair::Keypair; use tempfile::TempDir; use crate::{ - config::{BlockSize, LifecycleMode}, + config::{validator::ReplicationConfig, BlockSize, LifecycleMode}, consts::{self, DEFAULT_VALIDATOR_KEYPAIR}, types::network::{BindAddress, Remote}, ValidatorParams, @@ -404,6 +404,10 @@ fn test_example_config_full_coverage() { config.validator.keypair.0.to_base58_string(), DEFAULT_VALIDATOR_KEYPAIR ); + assert!(matches!( + config.validator.replication_mode, + crate::config::validator::ReplicationMode::Standalone + )); // ======================================================================== // 6. Chain Commitment @@ -710,3 +714,16 @@ fn test_bind_address_toml_deserialize_port_out_of_range_errors() { msg ); } + +#[test] +#[parallel] +fn test_replication_config_debug_redacts_secret() { + let cfg = ReplicationConfig { + url: "nats://0.0.0.0:4222".parse().unwrap(), + secret: "SUASECRET".into(), + }; + + let dbg = format!("{cfg:?}"); + assert!(dbg.contains("")); + assert!(!dbg.contains("SUASECRET")); +} diff --git a/magicblock-replicator/src/nats/broker.rs b/magicblock-replicator/src/nats/broker.rs index a43a846d2..dd1e07f9a 100644 --- a/magicblock-replicator/src/nats/broker.rs +++ b/magicblock-replicator/src/nats/broker.rs @@ -32,10 +32,13 @@ impl Broker { /// Connects to NATS and initializes all JetStream resources. /// /// Resources are created idempotently - safe to call multiple times. - pub async fn connect(url: Url) -> Result { + /// secret argument: is the NATS nkey secret, which must have a paired + /// public key stored in the server + pub async fn connect(url: Url, secret: String) -> Result { let addr = ServerAddr::from_url(url)?; let client = ConnectOptions::new() + .nkey(secret) .max_reconnects(None) .reconnect_delay_callback(|attempts| { let ms = (attempts as u64 * cfg::RECONNECT_BASE_MS) diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 8a70e5dfe..fd1ca6743 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -10798,6 +10798,7 @@ dependencies = [ "solana-transaction-status-client-types", "tempfile", "tokio", + "tokio-util 0.7.17", "tracing", "tracing-log", "tracing-subscriber",