Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,25 @@ basefee = 0
# Env: MBV_VALIDATOR__KEYPAIR
keypair = "9Vo7TbA5YfC5a33JhAi9Fb41usA6JwecHNRw3f9MzzHAM8hFnXTzL5DcEHwsAFjuUZ8vNQcJ4XziRFpMc3gTgBQ"

# Replication role for this validator.
# Default: "standalone"
# Options:
# - "standalone": disable replication
#
# - "stand-by": replicate and allow takeover
#
# [validator.replication-mode.stand-by]
# url = "nats://0.0.0.0:4222"
# secret = "SUAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
#
# - "replica-only": replicate without takeover
# # specify the pubkey of the primary node, which we are replicating from
#
# [validator.replication-mode.replica-only]
# url = "nats://0.0.0.0:4222"
# secret = "SUAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# authority-override = "L12m1UhkJRfHyvLMcVucJwxXeuD728EqVDDwQDxFMNo"

[aperture]
# Network address to bind the main JSON RPC service to.
# Websocket bind address is derived by incrementing the port
Expand Down
49 changes: 32 additions & 17 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ impl MagicValidator {
// Initialization
// -----------------
#[instrument(skip_all, fields(last_slot = tracing::field::Empty))]
pub async fn try_from_config(config: ValidatorParams) -> ApiResult<Self> {
pub async fn try_from_config(
mut config: ValidatorParams,
) -> ApiResult<Self> {
// TODO(thlorenz): this will need to be recreated on each start
let token = CancellationToken::new();
let identity_keypair = config.validator.keypair.insecure_clone();
Expand Down Expand Up @@ -189,23 +191,36 @@ impl MagicValidator {
// Connect to replication broker if configured.
// Returns (broker, is_fresh_start) where is_fresh_start indicates
// whether accountsdb was empty and may need a snapshot.
let broker =
if let Some(url) = config.validator.replication_mode.remote() {
let mut broker = Broker::connect(url).await?;
let is_fresh_start = accountsdb.slot() == 0;
// Fetch snapshot from primary if starting fresh
if is_fresh_start {
if let Some(snapshot) = broker.get_snapshot().await? {
accountsdb.insert_external_snapshot(
snapshot.slot,
&snapshot.data,
)?;
}
let broker = if let Some(conf) =
config.validator.replication_mode.config()
{
let step_start = Instant::now();
let mut broker = Broker::connect(conf.url, conf.secret).await?;
let is_fresh_start = accountsdb.slot() == 0;
// Fetch snapshot from primary if starting fresh
if is_fresh_start {
info!(
"accountsdb is not initialized, trying to fetch snapshot"
);
if let Some(snapshot) = broker.get_snapshot().await? {
info!(slot = snapshot.slot, "fetched accountsdb snapshot");
accountsdb.insert_external_snapshot(
snapshot.slot,
&snapshot.data,
)?;
// we have essentially reset the accountsdb,
// and chainlink should not prune it, as it
// would introduce divergence with primary node
config.accountsdb.reset = true;
} else {
warn!("no snapshot is found in replication stream");
}
Some((broker, is_fresh_start))
} else {
None
};
}
log_timing("startup", "replication broker init", step_start);
Some((broker, is_fresh_start))
} else {
None
};
let accountsdb = Arc::new(accountsdb);
let (mut dispatch, validator_channels) = link();

Expand Down
56 changes: 49 additions & 7 deletions magicblock-config/src/config/validator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// src/config/validator.rs
use std::fmt;

use serde::{Deserialize, Serialize};
use solana_keypair::Keypair;
use solana_pubkey::Pubkey;
Expand Down Expand Up @@ -30,9 +31,50 @@ pub enum ReplicationMode {
// Validator which doesn't participate in replication
Standalone,
/// Validator which participates in replication: acting as either a primary or replicator
StandBy(Url),
StandBy(ReplicationConfig),
/// Validator which participates in replication only as replicator (no takeover)
ReplicaOnly(Url, SerdePubkey),
ReplicaOnly(ReplicationConfig),
}

#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct ReplicationConfig {
pub url: Url,
pub secret: String,
pub authority_override: Option<SerdePubkey>,
}

impl fmt::Debug for ReplicationConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReplicationConfig")
.field("url", &self.url)
.field("secret", &"<redacted>")
.field("authority_override", &self.authority_override)
.finish()
}
}

impl Serialize for ReplicationConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(Serialize)]
#[serde(rename_all = "kebab-case")]
struct Redacted<'a> {
url: &'a Url,
secret: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
authority_override: &'a Option<SerdePubkey>,
}

Redacted {
url: &self.url,
secret: "<redacted>",
authority_override: &self.authority_override,
}
.serialize(serializer)
}
}

impl Default for ValidatorConfig {
Expand All @@ -50,16 +92,16 @@ impl Default for ValidatorConfig {
impl ReplicationMode {
/// Returns the remote URL if this node participates in replication.
/// Returns `None` for `Standalone` mode.
pub fn remote(&self) -> Option<Url> {
pub fn config(&self) -> Option<ReplicationConfig> {
match self {
Self::Standalone => None,
Self::StandBy(u) | Self::ReplicaOnly(u, _) => Some(u.clone()),
Self::StandBy(c) | Self::ReplicaOnly(c) => Some(c.clone()),
}
}

pub fn authority_override(&self) -> Option<Pubkey> {
if let Self::ReplicaOnly(_, pk) = self {
return Some(pk.0);
if let Self::ReplicaOnly(c) = self {
return c.authority_override.as_ref().map(|pk| pk.0);
}
None
}
Expand Down
20 changes: 19 additions & 1 deletion magicblock-config/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use solana_keypair::Keypair;
use tempfile::TempDir;

use crate::{
config::{BlockSize, LifecycleMode},
config::{validator::ReplicationConfig, BlockSize, LifecycleMode},
consts::{self, DEFAULT_VALIDATOR_KEYPAIR},
types::network::{BindAddress, Remote},
ValidatorParams,
Expand Down Expand Up @@ -404,6 +404,10 @@ fn test_example_config_full_coverage() {
config.validator.keypair.0.to_base58_string(),
DEFAULT_VALIDATOR_KEYPAIR
);
assert!(matches!(
config.validator.replication_mode,
crate::config::validator::ReplicationMode::Standalone
));

// ========================================================================
// 6. Chain Commitment
Expand Down Expand Up @@ -710,3 +714,17 @@ fn test_bind_address_toml_deserialize_port_out_of_range_errors() {
msg
);
}

#[test]
#[parallel]
fn test_replication_config_debug_redacts_secret() {
let cfg = ReplicationConfig {
url: "nats://0.0.0.0:4222".parse().unwrap(),
secret: "SUASECRET".into(),
authority_override: None,
};

let dbg = format!("{cfg:?}");
assert!(dbg.contains("<redacted>"));
assert!(!dbg.contains("SUASECRET"));
}
5 changes: 4 additions & 1 deletion magicblock-replicator/src/nats/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ impl Broker {
/// Connects to NATS and initializes all JetStream resources.
///
/// Resources are created idempotently - safe to call multiple times.
pub async fn connect(url: Url) -> Result<Self> {
/// secret argument: is the NATS nkey secret, which must have a paired
/// public key stored in the server
pub async fn connect(url: Url, secret: String) -> Result<Self> {
let addr = ServerAddr::from_url(url)?;

let client = ConnectOptions::new()
.nkey(secret)
.max_reconnects(None)
.reconnect_delay_callback(|attempts| {
let ms = (attempts as u64 * cfg::RECONNECT_BASE_MS)
Expand Down
Loading