Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 27 additions & 3 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use magicblock_committor_service::{
};
use magicblock_config::{
config::{
ChainOperationConfig, LedgerConfig, LifecycleMode, LoadableProgram,
validator::ReplicationMode, ChainOperationConfig, LedgerConfig,
LifecycleMode, LoadableProgram,
},
ValidatorParams,
};
Expand All @@ -54,7 +55,10 @@ use magicblock_metrics::{metrics::TRANSACTION_COUNT, MetricsService};
use magicblock_processor::{
build_svm_env,
loader::load_upgradeable_programs,
scheduler::{state::TransactionSchedulerState, TransactionScheduler},
scheduler::{
state::{SchedulerMode, TransactionSchedulerState},
TransactionScheduler,
},
};
use magicblock_program::{
init_magic_sys,
Expand All @@ -75,7 +79,10 @@ use solana_native_token::LAMPORTS_PER_SOL;
use solana_pubkey::Pubkey;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_signer::Signer;
use tokio::runtime::Builder;
use tokio::{
runtime::Builder,
sync::mpsc::{channel, Sender},
};
use tokio_util::sync::CancellationToken;
use tracing::*;

Expand Down Expand Up @@ -125,6 +132,7 @@ pub struct MagicValidator {
claim_fees_task: ClaimFeesTask,
task_scheduler: Option<TaskSchedulerService>,
transaction_execution: thread::JoinHandle<()>,
mode_tx: Sender<SchedulerMode>,
}

impl MagicValidator {
Expand Down Expand Up @@ -252,6 +260,9 @@ impl MagicValidator {

validator::init_validator_authority(identity_keypair);
let base_fee = config.validator.basefee;

// Mode switcher for transitioning from Replica to Primary mode after ledger replay
let (mode_tx, mode_rx) = channel(1);
let txn_scheduler_state = TransactionSchedulerState {
accountsdb: accountsdb.clone(),
ledger: ledger.clone(),
Expand All @@ -265,6 +276,7 @@ impl MagicValidator {
.auto_airdrop_lamports
> 0,
shutdown: token.clone(),
mode_rx,
};
TRANSACTION_COUNT.inc_by(ledger.count_transactions()? as u64);
// Faucet keypair is only used for airdrops, which are not allowed in
Expand Down Expand Up @@ -368,6 +380,7 @@ impl MagicValidator {
block_udpate_tx: validator_channels.block_update,
task_scheduler: Some(task_scheduler),
transaction_execution,
mode_tx,
})
}

Expand Down Expand Up @@ -684,6 +697,17 @@ impl MagicValidator {
// Ledger processing needs to happen before anything of the below
let step_start = Instant::now();
self.maybe_process_ledger().await?;

// Switch scheduler to Primary mode after ledger replay completes.
// Primary validators accept client transactions; Replica validators stay
// in Replica mode to receive transactions from the primary.
if let ReplicationMode::Standalone =
self.config.validator.replication_mode
{
// Ignore send errors: scheduler may have shut down.
let _ = self.mode_tx.send(SchedulerMode::Primary).await;
}

log_timing("startup", "maybe_process_ledger", step_start);

// Ledger replay has completed, we can now clean non-delegated accounts
Expand Down
16 changes: 16 additions & 0 deletions magicblock-config/src/config/validator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// src/config/validator.rs
use serde::{Deserialize, Serialize};
use solana_keypair::Keypair;
use url::Url;

use crate::{consts, types::SerdeKeypair};

Expand All @@ -13,6 +14,20 @@ pub struct ValidatorConfig {

/// The validator's identity keypair, encoded in Base58.
pub keypair: SerdeKeypair,

/// Replication role: Primary accepts client transactions, Replica replays from Primary.
pub replication_mode: ReplicationMode,
}

/// Defines the validator's role in a replication setup.
#[derive(Deserialize, Serialize, Debug, Clone)]
pub enum ReplicationMode {
// Validator which doesn't participate in replication
Standalone,
/// Validator which participates in replication: acting as either a primary or replicator
StandBy(Url),
/// Validator which participates in replication only as replicator (no takeover)
ReplicatOnly(Url),
}

impl Default for ValidatorConfig {
Expand All @@ -22,6 +37,7 @@ impl Default for ValidatorConfig {
Self {
basefee: consts::DEFAULT_BASE_FEE,
keypair: SerdeKeypair(keypair),
replication_mode: ReplicationMode::Standalone,
}
}
}
53 changes: 46 additions & 7 deletions magicblock-core/src/link/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,37 @@ pub struct ProcessableTransaction {
pub encoded: Option<Vec<u8>>,
}

/// Specifies the position and persistence behavior for replaying a transaction.
///
/// During replication, transactions must be replayed at the same slot and index
/// as they appeared on the primary to maintain ordering consistency.
#[derive(Clone, Copy)]
pub struct ReplayPosition {
/// The slot in which the transaction was originally included.
pub slot: Slot,
/// The transaction's index within that slot (0-based).
pub index: u32,
/// Whether to persist the replay to the ledger and broadcast status.
/// - `true`: Record to ledger + broadcast (for replay from primary/replicator)
/// - `false`: No recording, no broadcast (for local ledger replay during startup)
pub persist: bool,
}

/// An enum that specifies how a transaction should be processed by the scheduler.
/// Each variant also carries the one-shot sender to return the result to the original caller.
///
/// Variants that require result notification carry a one-shot sender:
/// - `Simulation` and `Execution` return results to the caller
/// - `Replay` is fire-and-forget (no sender, just position/persistence info)
pub enum TransactionProcessingMode {
/// Process the transaction as a simulation.
Simulation(TxnSimulationResultTx),
/// Process the transaction for standard execution.
Execution(TxnExecutionResultTx),
/// Replay the transaction against the current state without persistence to the ledger.
Replay(TxnReplayResultTx),
/// Replay the transaction at a specific slot/index position.
///
/// The `ReplayPosition` specifies where to record the transaction in the ledger
/// and whether to persist/broadcast the result.
Replay(ReplayPosition),
}

/// The detailed outcome of a transaction simulation.
Expand Down Expand Up @@ -250,14 +272,31 @@ impl TransactionSchedulerHandle {
self.send(txn, mode).await
}

/// Submits a transaction to be replayed against the
/// current accountsdb state and awaits the result.
/// Submits a transaction to be replayed against the current accountsdb state.
///
/// Unlike `execute()`, this method is fire-and-forget: it returns success
/// once the transaction is queued, not after execution completes.
///
/// # Arguments
/// * `position` - The slot/index at which to record the transaction, plus
/// whether to persist to ledger and broadcast status
/// * `txn` - The transaction to replay
pub async fn replay(
&self,
position: ReplayPosition,
txn: impl SanitizeableTransaction,
) -> TransactionResult {
let mode = TransactionProcessingMode::Replay;
self.send(txn, mode).await?
let mode = TransactionProcessingMode::Replay(position);
let transaction = txn.sanitize(true)?;
let txn = ProcessableTransaction {
transaction,
mode,
encoded: None,
};
self.0
.send(txn)
.await
.map_err(|_| TransactionError::ClusterMaintenance)
}

/// A private helper that handles the common logic of sanitizing, sending a
Expand Down
2 changes: 1 addition & 1 deletion magicblock-ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ solana-signature = { workspace = true, features = ["rand"] }
solana-signer = { workspace = true }
solana-keypair = { workspace = true }
solana-instruction = { workspace = true }
solana-transaction = { workspace = true }
solana-transaction = { workspace = true, features = ["blake3", "verify"] }
solana-transaction-error = { workspace = true }
solana-message = { workspace = true }
solana-transaction-context = { workspace = true }
Expand Down
20 changes: 11 additions & 9 deletions magicblock-ledger/src/blockstore_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;

use magicblock_core::link::transactions::{
SanitizeableTransaction, TransactionSchedulerHandle,
ReplayPosition, SanitizeableTransaction, TransactionSchedulerHandle,
};
use num_format::{Locale, ToFormattedString};
use solana_clock::{Slot, UnixTimestamp};
Expand Down Expand Up @@ -64,11 +64,7 @@ async fn replay_blocks(
if enabled!(Level::INFO)
&& slot.is_multiple_of(PROGRESS_REPORT_INTERVAL)
{
info!(
slot = %slot.to_formatted_string(&Locale::en),
max_slot = %max_slot,
"Processing block"
);
info!(slot, max_slot, "Processing block");
}

let VersionedConfirmedBlock {
Expand Down Expand Up @@ -131,10 +127,16 @@ async fn replay_blocks(
let txn = txn.sanitize(false).map_err(|err| {
LedgerError::BlockStoreProcessor(err.to_string())
})?;
let position = ReplayPosition {
slot: block.slot,
// TODO(bmuddha/thlorenz): retrieve the proper transaction index
index: 0,
persist: false,
};
let result =
transaction_scheduler.replay(txn).await.map_err(|err| {
LedgerError::BlockStoreProcessor(err.to_string())
});
transaction_scheduler.replay(position, txn).await.map_err(
|err| LedgerError::BlockStoreProcessor(err.to_string()),
);
if !enabled!(Level::TRACE) {
debug!(signature = %signature, result = ?result, "Transaction replay result");
}
Expand Down
2 changes: 0 additions & 2 deletions magicblock-ledger/src/conversions/mod.rs

This file was deleted.

Loading