diff --git a/Cargo.lock b/Cargo.lock index fa7fec7e6..92397d8e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3407,6 +3407,7 @@ dependencies = [ "magicblock-metrics", "magicblock-processor", "magicblock-program", + "magicblock-replicator", "magicblock-services", "magicblock-task-scheduler", "magicblock-validator-admin", @@ -3588,6 +3589,7 @@ name = "magicblock-core" version = "0.8.5" dependencies = [ "bincode", + "bytes", "console-subscriber", "flume", "magicblock-magic-program-api", @@ -3808,6 +3810,7 @@ dependencies = [ "tempfile", "thiserror 1.0.69", "tokio", + "tokio-util", "tracing", "url", ] @@ -9203,6 +9206,7 @@ dependencies = [ "solana-signature", "solana-signer", "solana-transaction", + "solana-transaction-error", "solana-transaction-status-client-types", "tempfile", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6ff79b32a..9fef6feef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ async-nats = "0.46" async-trait = "0.1.77" base64 = "0.21.7" bincode = "1.3.3" -bytes = "1.0" +bytes = { version = "1.0", features = ["serde"] } borsh = { version = "1.5.1", features = ["derive", "unstable__schema"] } bs58 = "0.5.1" byteorder = "1.5.0" @@ -112,6 +112,7 @@ magicblock-magic-program-api = { path = "./magicblock-magic-program-api" } magicblock-metrics = { path = "./magicblock-metrics" } magicblock-processor = { path = "./magicblock-processor" } magicblock-program = { path = "./programs/magicblock" } +magicblock-replicator = { path = "./magicblock-replicator" } magicblock-rpc-client = { path = "./magicblock-rpc-client" } magicblock-services = { path = "./magicblock-services" } magicblock-table-mania = { path = "./magicblock-table-mania" } diff --git a/magicblock-accounts-db/src/lib.rs b/magicblock-accounts-db/src/lib.rs index 349fd80c9..20041bd82 100644 --- a/magicblock-accounts-db/src/lib.rs +++ b/magicblock-accounts-db/src/lib.rs @@ -6,7 +6,6 @@ use index::{ }; use lmdb::{RwTransaction, Transaction}; use magicblock_config::config::AccountsDbConfig; -use parking_lot::{RwLock, RwLockWriteGuard}; use solana_account::{ cow::AccountBorrowed, AccountSharedData, ReadableAccount, }; @@ -19,10 +18,6 @@ use crate::{snapshot::SnapshotManager, traits::AccountsBank}; pub type AccountsDbResult = Result; -/// A global lock used to suspend all write operations during critical -/// sections (like snapshots). -pub type GlobalSyncLock = Arc>; - pub const ACCOUNTSDB_DIR: &str = "accountsdb"; /// The main Accounts Database. @@ -39,10 +34,6 @@ pub struct AccountsDb { index: AccountsDbIndex, /// Manages snapshots and state restoration. snapshot_manager: Arc, - /// Global lock ensures atomic snapshots by pausing writes. - /// Note: Reads are generally wait-free/lock-free via mmap, - /// unless they require index cursor stability. - write_lock: GlobalSyncLock, } impl AccountsDb { @@ -79,7 +70,6 @@ impl AccountsDb { storage, index, snapshot_manager, - write_lock: GlobalSyncLock::default(), }; // Recover state if the requested slot is older than our current state @@ -291,30 +281,22 @@ impl AccountsDb { /// /// Returns the state checksum computed at snapshot time. /// The checksum can be used to verify state consistency across nodes. - pub fn take_snapshot(self: &Arc, slot: u64) -> u64 { - let this = self.clone(); - - // Phase 1: Create snapshot directory (with write lock) - let locked = this.write_lock.write(); - this.flush(); - // SAFETY: - // we have acquired the write lock above - let checksum = unsafe { this.checksum() }; - let used_storage = this.storage.active_segment(); - - let snapshot_dir = this - .snapshot_manager - .create_snapshot_dir(slot, used_storage); - drop(locked); - thread::spawn(move || { - // Phase 2: Archive directory (no lock needed) - let _ = snapshot_dir - .and_then(|dir| { - this.snapshot_manager.archive_and_register(&dir) - }) - .log_err(|| "failed to create accountsdb snapshot"); - }); - checksum + /// + /// + /// # Safety + /// the caller must ensure that no state transitions are taking + /// place concurrently when this operation is in progress + pub unsafe fn take_snapshot(&self, slot: u64) -> AccountsDbResult { + // Create snapshot directory (potential deep copy) + self.flush(); + let checksum = self.checksum(); + let used_storage = self.storage.active_segment(); + + let manager = self.snapshot_manager.clone(); + let dir = manager.create_snapshot_dir(slot, used_storage)?; + // Archive directory in background to avoid blocking the caller + thread::spawn(move || manager.archive_and_register(&dir)); + Ok(checksum) } /// Ensures the database state is at most `slot`. @@ -336,9 +318,6 @@ impl AccountsDb { "Current slot ahead of target, rolling back" ); - // Block all writes during restoration - let _guard = self.write_lock.write(); - let restored_slot = self .snapshot_manager .restore_from_snapshot(target_slot) @@ -372,10 +351,6 @@ impl AccountsDb { self.index.flush(); } - pub fn write_lock(&self) -> GlobalSyncLock { - self.write_lock.clone() - } - /// Inserts an external snapshot archive received over the network. /// /// If the snapshot slot is newer than the current DB slot, immediately @@ -418,8 +393,8 @@ impl AccountsDb { /// suitable for verifying state consistency across nodes. /// /// # Safety - /// the caller must acquire the write lock on accountsdb, so that - /// the state doesn't change during checksum computation + /// the caller must ensure that no concurrent write access is being performed on + /// accountsdb, so that the state doesn't change during checksum computation pub unsafe fn checksum(&self) -> u64 { let mut hasher = xxhash3_64::Hasher::new(); for (pubkey, acc) in self.iter_all() { @@ -432,15 +407,6 @@ impl AccountsDb { hasher.finish() } - /// Acquires exclusive write access to the database. - /// - /// The returned guard blocks all other write operations while held. - /// Use this when you need to ensure the database state doesn't change - /// during operations like checksum computation. - pub fn lock_database(&self) -> RwLockWriteGuard<'_, ()> { - self.write_lock.write() - } - pub fn database_directory(&self) -> &Path { self.snapshot_manager.database_path() } diff --git a/magicblock-accounts-db/src/tests.rs b/magicblock-accounts-db/src/tests.rs index 48e054fc8..84708f86f 100644 --- a/magicblock-accounts-db/src/tests.rs +++ b/magicblock-accounts-db/src/tests.rs @@ -585,12 +585,6 @@ fn test_checksum_deterministic_across_dbs() { db2.insert_account(&pubkey, &account).unwrap(); } - // Acquire write locks before computing checksums - let lock1 = db1.write_lock(); - let lock2 = db2.write_lock(); - let _guard1 = lock1.write(); - let _guard2 = lock2.write(); - assert_eq!( unsafe { db1.checksum() }, unsafe { db2.checksum() }, @@ -610,17 +604,13 @@ fn test_checksum_detects_state_change() { }) .collect(); - let lock = env.write_lock(); - let _guard = lock.write(); let original_checksum = unsafe { env.checksum() }; - drop(_guard); // Modify a single account's data accounts[5].1.data_as_mut_slice()[0] ^= 0xFF; env.insert_account(&accounts[5].0, &accounts[5].1).unwrap(); { - let _guard = lock.write(); assert_ne!( unsafe { env.checksum() }, original_checksum, @@ -634,7 +624,6 @@ fn test_checksum_detects_state_change() { .unwrap(); { - let _guard = lock.write(); assert_ne!( unsafe { env.checksum() }, original_checksum, @@ -715,7 +704,7 @@ impl TestEnv { /// Takes a snapshot and waits for archiving to complete. fn take_snapshot_and_wait(&self, slot: u64) -> u64 { - let checksum = self.adb.take_snapshot(slot); + let checksum = unsafe { self.adb.take_snapshot(slot) }; // Wait for background archiving to complete let mut retries = 0; while !self.adb.snapshot_exists(slot) && retries < 200 { @@ -723,7 +712,7 @@ impl TestEnv { retries += 1; } assert!(self.adb.snapshot_exists(slot), "Snapshot should exist"); - checksum + checksum.expect("failed to take accountsdb snapshot") } fn restore_to_slot(mut self, slot: u64) -> Self { diff --git a/magicblock-aperture/src/encoder.rs b/magicblock-aperture/src/encoder.rs index 72490c48e..dc3b37c02 100644 --- a/magicblock-aperture/src/encoder.rs +++ b/magicblock-aperture/src/encoder.rs @@ -3,10 +3,7 @@ use std::fmt::Debug; use hyper::body::Bytes; use json::Serialize; use magicblock_core::{ - link::{ - accounts::LockedAccount, - transactions::{TransactionResult, TransactionStatus}, - }, + link::{accounts::LockedAccount, transactions::TransactionStatus}, Slot, }; use solana_account::ReadableAccount; @@ -14,7 +11,7 @@ use solana_account_decoder::{ encode_ui_account, UiAccountEncoding, UiDataSliceConfig, }; use solana_pubkey::Pubkey; -use solana_transaction_error::TransactionError; +use solana_transaction_error::{TransactionError, TransactionResult}; use crate::{ requests::{params::SerdeSignature, payload::NotificationPayload}, @@ -110,7 +107,7 @@ impl Encoder for ProgramAccountEncoder { pub(crate) struct TransactionResultEncoder; impl Encoder for TransactionResultEncoder { - type Data = TransactionResult; + type Data = TransactionResult<()>; fn encode( &self, diff --git a/magicblock-aperture/src/requests/http/mod.rs b/magicblock-aperture/src/requests/http/mod.rs index 6afc7b55d..fdd883fbb 100644 --- a/magicblock-aperture/src/requests/http/mod.rs +++ b/magicblock-aperture/src/requests/http/mod.rs @@ -213,7 +213,10 @@ impl HttpDispatcher { } let txn = transaction.sanitize(sigverify)?; - Ok(WithEncoded { txn, encoded }) + Ok(WithEncoded { + txn, + encoded: encoded.into(), + }) } /// Ensures all accounts required for a transaction are present in the `AccountsDb`. diff --git a/magicblock-aperture/src/state/transactions.rs b/magicblock-aperture/src/state/transactions.rs index 6360899f5..bd5869387 100644 --- a/magicblock-aperture/src/state/transactions.rs +++ b/magicblock-aperture/src/state/transactions.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use magicblock_core::{link::transactions::TransactionResult, Slot}; +use magicblock_core::Slot; use solana_signature::Signature; +use solana_transaction_error::TransactionResult; use super::ExpiringCache; @@ -18,5 +19,5 @@ pub(crate) struct SignatureResult { /// The slot in which the transaction was processed. pub slot: Slot, /// The result of the transaction (e.g., success or an error). - pub result: TransactionResult, + pub result: TransactionResult<()>, } diff --git a/magicblock-api/Cargo.toml b/magicblock-api/Cargo.toml index 79bacd067..765f8c5b4 100644 --- a/magicblock-api/Cargo.toml +++ b/magicblock-api/Cargo.toml @@ -29,6 +29,7 @@ magicblock-metrics = { workspace = true } magicblock-processor = { workspace = true } magicblock-program = { workspace = true } magicblock-services = { workspace = true } +magicblock-replicator = { workspace = true } magicblock-task-scheduler = { workspace = true } magicblock-validator-admin = { workspace = true } diff --git a/magicblock-api/src/errors.rs b/magicblock-api/src/errors.rs index 3926e4232..6268fff5a 100644 --- a/magicblock-api/src/errors.rs +++ b/magicblock-api/src/errors.rs @@ -116,6 +116,9 @@ pub enum ApiError { FailedToSanitizeTransaction( #[from] solana_transaction_error::TransactionError, ), + + #[error("Replication service failed: {0}")] + Replication(#[from] magicblock_replicator::Error), } impl From for ApiError { diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 41a43ba6d..77cb0a2d2 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -64,6 +64,7 @@ use magicblock_program::{ validator::{self, validator_authority}, TransactionScheduler as ActionTransactionScheduler, }; +use magicblock_replicator::{nats::Broker, ReplicationService}; use magicblock_services::actions_callback_service::ActionsCallbackService; use magicblock_task_scheduler::{SchedulerDatabase, TaskSchedulerService}; use magicblock_validator_admin::claim_fees::ClaimFeesTask; @@ -122,6 +123,7 @@ pub struct MagicValidator { ledger_truncator: LedgerTruncator, slot_ticker: Option>, committor_service: Option>, + replication_service: Option, scheduled_commits_processor: Option>, chainlink: Arc, rpc_handle: thread::JoinHandle<()>, @@ -132,6 +134,8 @@ pub struct MagicValidator { claim_fees_task: ClaimFeesTask, task_scheduler: Option, transaction_execution: thread::JoinHandle<()>, + replication_handle: + Option>>, mode_tx: Sender, is_standalone: bool, } @@ -174,8 +178,65 @@ impl MagicValidator { let latest_block = ledger.latest_block().load(); let step_start = Instant::now(); - let accountsdb = + let mut accountsdb = AccountsDb::new(&config.accountsdb, &config.storage, last_slot)?; + + // Mode switch channel for transitioning from StartingUp to Primary + // or Replica mode after ledger replay + let (mode_tx, mode_rx) = channel(1); + let is_standalone = matches!( + config.validator.replication_mode, + ReplicationMode::Standalone + ); + + // Connect to replication broker if configured. + // Returns (broker, is_fresh_start) where is_fresh_start indicates + // whether accountsdb was empty and may need a snapshot. + let broker = + if let Some(url) = config.validator.replication_mode.remote() { + let mut broker = Broker::connect(url).await?; + let is_fresh_start = accountsdb.slot() == 0; + // Fetch snapshot from primary if starting fresh + if is_fresh_start { + if let Some(snapshot) = broker.get_snapshot().await? { + accountsdb.insert_external_snapshot( + snapshot.slot, + &snapshot.data, + )?; + } + } + Some((broker, is_fresh_start)) + } else { + None + }; + let accountsdb = Arc::new(accountsdb); + let (mut dispatch, validator_channels) = link(); + + let replication_service = + if let Some((broker, is_fresh_start)) = broker { + let messages_rx = dispatch.replication_messages.take().expect( + "replication channel should always exist after init", + ); + // ReplicaOnly mode cannot promote to primary + let can_promote = matches!( + config.validator.replication_mode, + ReplicationMode::StandBy(_) + ); + ReplicationService::new( + broker, + mode_tx.clone(), + accountsdb.clone(), + ledger.clone(), + dispatch.transaction_scheduler.clone(), + messages_rx, + token.clone(), + is_fresh_start, + can_promote, + ) + .await? + } else { + None + }; log_timing("startup", "accountsdb_init", step_start); for (pubkey, account) in genesis_config.accounts { if accountsdb.get_account(&pubkey).is_some() { @@ -198,8 +259,6 @@ impl MagicValidator { let faucet_keypair = funded_faucet(&accountsdb, ledger.ledger_path().as_path())?; - let accountsdb = Arc::new(accountsdb); - let step_start = Instant::now(); let metrics_service = magicblock_metrics::try_start_metrics_service( config.metrics.address.0, @@ -217,8 +276,6 @@ impl MagicValidator { ); log_timing("startup", "system_metrics_ticker_start", step_start); - let (mut dispatch, validator_channels) = link(); - let step_start = Instant::now(); info!("Starting committor service"); let committor_service = @@ -273,29 +330,12 @@ impl MagicValidator { log_timing("startup", "load_programs", step_start); validator::init_validator_authority(identity_keypair); - match &config.validator.replication_mode { - ReplicationMode::ReplicateOnly(_, pk) => { - validator::set_validator_authority_override(pk.0); - } - ReplicationMode::StandBy(_, pk) => { - if validator_pubkey != pk.0 { - return Err(ApiError::StandByKeypairMismatch { - configured: validator_pubkey, - expected: pk.0, - }); - } - } - ReplicationMode::Standalone => {} + if let Some(pk) = config.validator.replication_mode.authority_override() + { + validator::set_validator_authority_override(pk); } let base_fee = config.validator.basefee; - // Mode switch channel for transitioning from StartingUp to Primary - // or Replica mode after ledger replay - let (mode_tx, mode_rx) = channel(1); - let is_standalone = matches!( - config.validator.replication_mode, - ReplicationMode::Standalone - ); let txn_scheduler_state = TransactionSchedulerState { accountsdb: accountsdb.clone(), ledger: ledger.clone(), @@ -304,12 +344,14 @@ impl MagicValidator { account_update_tx: validator_channels.account_update, environment: build_svm_env(&accountsdb, latest_block.blockhash, 0), tasks_tx: validator_channels.tasks_service, + replication_tx: validator_channels.replication_messages, is_auto_airdrop_lamports_enabled: config .chainlink .auto_airdrop_lamports > 0, shutdown: token.clone(), mode_rx, + pause_permit: validator_channels.pause_permit, }; TRANSACTION_COUNT.inc_by(ledger.count_transactions()? as u64); // Faucet keypair is only used for airdrops, which are not allowed in @@ -401,6 +443,7 @@ impl MagicValidator { // NOTE: set during [Self::start] slot_ticker: None, committor_service, + replication_service, scheduled_commits_processor, chainlink, token, @@ -413,6 +456,7 @@ impl MagicValidator { block_udpate_tx: validator_channels.block_update, task_scheduler: Some(task_scheduler), transaction_execution, + replication_handle: None, mode_tx, is_standalone, }) @@ -581,14 +625,6 @@ impl MagicValidator { if accountsdb_slot.saturating_sub(1) == ledger_slot { return Ok(()); } - // If a replay authority override is configured, set it before - // replaying so transactions are verified against that key. - // Save the prior override so we can restore it after replay - // (e.g. a replication-mode override may already be active). - let prior_override = validator::validator_authority_override(); - if let Some(ref pk) = self.config.ledger.replay_authority_override { - validator::set_validator_authority_override(pk.0); - } // SOLANA only allows blockhash to be valid for 150 slot back in time, // considering that the average slot time on solana is 400ms, then: @@ -605,15 +641,6 @@ impl MagicValidator { ) .await; - // Restore the prior authority override now that replay is done, - // regardless of whether process_ledger succeeded or failed. - if self.config.ledger.replay_authority_override.is_some() { - match prior_override { - Some(pk) => validator::set_validator_authority_override(pk), - None => validator::unset_validator_authority_override(), - } - } - let slot_to_continue_at = process_ledger_result?; log_timing("startup", "ledger_replay", step_start); self.accountsdb.set_slot(slot_to_continue_at); @@ -884,18 +911,19 @@ impl MagicValidator { // The message carries the target mode so the scheduler transitions to // the correct coordination mode: // - Standalone validators transition to Primary mode - // - StandBy/ReplicateOnly validators transition to Replica mode - let target = if self.is_standalone { - SchedulerMode::Primary - } else { - SchedulerMode::Replica - }; - self.mode_tx.try_send(target).map_err(|e| { - ApiError::FailedToSendModeSwitch(format!( - "Failed to send target mode {target:?} to scheduler: \ - {e}" - )) - })?; + // - StandBy/ReplicaOnly validators transition to Replica mode + if self.is_standalone { + self.mode_tx + .send(SchedulerMode::Primary) + .await + .map_err(|e| { + ApiError::FailedToSendModeSwitch(format!( + "Failed to send primary mode to scheduler: {e}" + )) + })?; + } else if let Some(replicator) = self.replication_service.take() { + self.replication_handle.replace(replicator.spawn()); + } // Now we are ready to start all services and are ready to accept transactions if let Some(frequency) = self @@ -1030,6 +1058,11 @@ impl MagicValidator { log_timing("shutdown", "ledger_truncator_join", step_start); let step_start = Instant::now(); let _ = self.transaction_execution.join(); + if let Some(handle) = self.replication_handle { + if let Ok(Err(error)) = handle.join() { + error!(%error, "replication service experienced catastrophic failure"); + } + } log_timing("shutdown", "transaction_execution_join", step_start); log_timing("shutdown", "stop_total", stop_start); diff --git a/magicblock-config/src/config/validator.rs b/magicblock-config/src/config/validator.rs index 44dadb150..692ca88fe 100644 --- a/magicblock-config/src/config/validator.rs +++ b/magicblock-config/src/config/validator.rs @@ -1,6 +1,7 @@ // src/config/validator.rs use serde::{Deserialize, Serialize}; use solana_keypair::Keypair; +use solana_pubkey::Pubkey; use url::Url; use crate::{ @@ -28,12 +29,10 @@ pub struct ValidatorConfig { pub enum ReplicationMode { // Validator which doesn't participate in replication Standalone, - /// Validator which participates in replication: acting as either a primary or replicator. - /// The `SerdePubkey` is the primary validator's pubkey used for authority signature verification. - StandBy(Url, SerdePubkey), - /// Validator which participates in replication only as replicator (no takeover). - /// The `SerdePubkey` is the primary validator's pubkey used for authority signature verification. - ReplicateOnly(Url, SerdePubkey), + /// Validator which participates in replication: acting as either a primary or replicator + StandBy(Url), + /// Validator which participates in replication only as replicator (no takeover) + ReplicaOnly(Url, SerdePubkey), } impl Default for ValidatorConfig { @@ -47,3 +46,21 @@ impl Default for ValidatorConfig { } } } + +impl ReplicationMode { + /// Returns the remote URL if this node participates in replication. + /// Returns `None` for `Standalone` mode. + pub fn remote(&self) -> Option { + match self { + Self::Standalone => None, + Self::StandBy(u) | Self::ReplicaOnly(u, _) => Some(u.clone()), + } + } + + pub fn authority_override(&self) -> Option { + if let Self::ReplicaOnly(_, pk) = self { + return Some(pk.0); + } + None + } +} diff --git a/magicblock-core/Cargo.toml b/magicblock-core/Cargo.toml index a4bc901dc..02430f3c0 100644 --- a/magicblock-core/Cargo.toml +++ b/magicblock-core/Cargo.toml @@ -13,6 +13,7 @@ console-subscriber = { workspace = true, optional = true } tokio = { workspace = true, features = ["sync"] } flume = { workspace = true } bincode = { workspace = true } +bytes = { workspace = true } serde = { workspace = true, features = ["derive"] } solana-account = { workspace = true } diff --git a/magicblock-core/src/lib.rs b/magicblock-core/src/lib.rs index cbe515ada..3bb588051 100644 --- a/magicblock-core/src/lib.rs +++ b/magicblock-core/src/lib.rs @@ -1,4 +1,6 @@ pub type Slot = u64; +/// Ordinal position of a transaction within a slot. +pub type TransactionIndex = u32; /// A macro that panics when running a debug build and logs the panic message /// instead when running in release mode. diff --git a/magicblock-core/src/link.rs b/magicblock-core/src/link.rs index 1950dd424..e935ba23e 100644 --- a/magicblock-core/src/link.rs +++ b/magicblock-core/src/link.rs @@ -1,13 +1,21 @@ +use std::sync::Arc; + use accounts::{AccountUpdateRx, AccountUpdateTx}; use blocks::{BlockUpdateRx, BlockUpdateTx}; -use tokio::sync::mpsc; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + Semaphore, +}; use transactions::{ ScheduledTasksRx, ScheduledTasksTx, TransactionSchedulerHandle, TransactionStatusRx, TransactionStatusTx, TransactionToProcessRx, }; +use crate::link::replication::Message; + pub mod accounts; pub mod blocks; +pub mod replication; pub mod transactions; /// The bounded capacity for MPSC channels that require backpressure. @@ -29,6 +37,8 @@ pub struct DispatchEndpoints { pub block_update: BlockUpdateRx, /// Receives scheduled (crank) tasks from transactions executor. pub tasks_service: Option, + /// Receives replication events from the transaction scheduler. + pub replication_messages: Option>, } /// A collection of channel endpoints for the **validator's internal core**. @@ -47,6 +57,10 @@ pub struct ValidatorChannelEndpoints { pub block_update: BlockUpdateTx, /// Sends scheduled (crank) tasks to tasks service from transactions executor. pub tasks_service: ScheduledTasksTx, + /// Sends replication events to the replication service. + pub replication_messages: Sender, + /// Semaphore used to pause scheduling for exclusive DB access (e.g., checksums). + pub pause_permit: Arc, } /// Creates and connects the full set of communication channels between the dispatch @@ -66,14 +80,23 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { // Bounded channels for command queues where applying backpressure is important. let (txn_to_process_tx, txn_to_process_rx) = mpsc::channel(LINK_CAPACITY); + let (replication_tx, replication_rx) = mpsc::channel(LINK_CAPACITY); + // Semaphore(1) coordinates exclusive access: scheduler holds permit during + // active scheduling, releases when idle; external callers acquire to pause. + let pause_permit = Arc::new(Semaphore::new(1)); + let transaction_scheduler = TransactionSchedulerHandle { + tx: txn_to_process_tx, + pause_permit: pause_permit.clone(), + }; // Bundle the respective channel ends for the dispatch side. let dispatch = DispatchEndpoints { - transaction_scheduler: TransactionSchedulerHandle(txn_to_process_tx), + transaction_scheduler, transaction_status: transaction_status_rx, account_update: account_update_rx, block_update: block_update_rx, tasks_service: Some(tasks_rx), + replication_messages: Some(replication_rx), }; // Bundle the corresponding channel ends for the validator's internal core. @@ -83,6 +106,8 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { account_update: account_update_tx, block_update: block_update_tx, tasks_service: tasks_tx, + replication_messages: replication_tx, + pause_permit, }; (dispatch, validator) diff --git a/magicblock-replicator/src/proto.rs b/magicblock-core/src/link/replication.rs similarity index 79% rename from magicblock-replicator/src/proto.rs rename to magicblock-core/src/link/replication.rs index 92a113eab..031b29d3e 100644 --- a/magicblock-replicator/src/proto.rs +++ b/magicblock-core/src/link/replication.rs @@ -1,19 +1,11 @@ //! Protocol message types for replication. -//! -//! # Wire Format -//! -//! The enum variant index serves as an implicit type tag. -use async_nats::Subject; -use magicblock_core::Slot; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use solana_hash::Hash; use solana_transaction::versioned::VersionedTransaction; -use crate::nats::Subjects; - -/// Ordinal position of a transaction within a slot. -pub type TransactionIndex = u32; +use crate::{Slot, TransactionIndex}; /// Index for block boundary markers (TransactionIndex::MAX - 1). /// Used to identify Block messages in slot/index comparisons. @@ -37,15 +29,9 @@ pub enum Message { } impl Message { - pub(crate) fn subject(&self) -> Subject { - match self { - Self::Transaction(_) => Subjects::transaction(), - Self::Block(_) => Subjects::block(), - Self::SuperBlock(_) => Subjects::superblock(), - } - } - - pub(crate) fn slot_and_index(&self) -> (Slot, TransactionIndex) { + /// Returns the (slot, index) position of this message. + /// Block and SuperBlock messages use sentinel index values. + pub fn slot_and_index(&self) -> (Slot, TransactionIndex) { match self { Self::Transaction(tx) => (tx.slot, tx.index), Self::Block(block) => (block.slot, BLOCK_INDEX), @@ -62,7 +48,7 @@ pub struct Transaction { /// Ordinal position within the slot. pub index: TransactionIndex, /// Bincode-encoded `VersionedTransaction`. - pub payload: Vec, + pub payload: Bytes, } /// Slot boundary marker with blockhash. diff --git a/magicblock-core/src/link/transactions.rs b/magicblock-core/src/link/transactions.rs index 65750b7da..b944d7df0 100644 --- a/magicblock-core/src/link/transactions.rs +++ b/magicblock-core/src/link/transactions.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; + +use bytes::Bytes; use flume::{Receiver as MpmcReceiver, Sender as MpmcSender}; use magicblock_magic_program_api::args::TaskRequest; use serde::Serialize; @@ -9,15 +12,15 @@ use solana_transaction::{ Transaction, }; use solana_transaction_context::TransactionReturnData; -use solana_transaction_error::TransactionError; +use solana_transaction_error::{TransactionError, TransactionResult}; use solana_transaction_status_client_types::TransactionStatusMeta; use tokio::sync::{ mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, - oneshot, + oneshot, OwnedSemaphorePermit, Semaphore, }; use super::blocks::BlockHash; -use crate::Slot; +use crate::{Slot, TransactionIndex}; /// The receiver end of the multi-producer, multi-consumer /// channel for communicating final transaction statuses. @@ -42,17 +45,20 @@ pub type ScheduledTasksTx = UnboundedSender; /// This is the primary entry point for all transaction-related /// operations like execution, simulation, and replay. #[derive(Clone)] -pub struct TransactionSchedulerHandle(pub(super) TransactionToProcessTx); +pub struct TransactionSchedulerHandle { + pub(super) tx: TransactionToProcessTx, + /// Semaphore for coordinating exclusive access with the scheduler. + /// See [`Self::wait_for_idle`] for usage. + pub(super) pause_permit: Arc, +} -/// The standard result of a transaction execution, indicating success or a `TransactionError`. -pub type TransactionResult = solana_transaction_error::TransactionResult<()>; /// The sender half of a one-shot channel used to return the result of a transaction simulation. pub type TxnSimulationResultTx = oneshot::Sender; /// An optional sender half of a one-shot channel for returning a transaction execution result. /// `None` is used for "fire-and-forget" scheduling. -pub type TxnExecutionResultTx = Option>; +pub type TxnExecutionResultTx = Option>>; /// The sender half of a one-shot channel used to return the result of a transaction replay. -pub type TxnReplayResultTx = oneshot::Sender; +pub type TxnReplayResultTx = oneshot::Sender>; /// Contains the final, committed status of an executed /// transaction, including its result and metadata. @@ -62,7 +68,7 @@ pub struct TransactionStatus { pub slot: Slot, pub txn: SanitizedTransaction, pub meta: TransactionStatusMeta, - pub index: u32, + pub index: TransactionIndex, } /// An internal message that bundles a sanitized transaction with its requested processing mode. @@ -72,7 +78,7 @@ pub struct ProcessableTransaction { pub mode: TransactionProcessingMode, /// Pre-encoded bincode bytes for the transaction. /// Used by the replicator to avoid redundant serialization. - pub encoded: Option>, + pub encoded: Option, } /// Specifies the position and persistence behavior for replaying a transaction. @@ -84,7 +90,7 @@ pub struct ReplayPosition { /// The slot in which the transaction was originally included. pub slot: Slot, /// The transaction's index within that slot (0-based). - pub index: u32, + pub index: TransactionIndex, /// Whether to persist the replay to the ledger and broadcast status. /// - `true`: Record to ledger + broadcast (for replay from primary/replicator) /// - `false`: No recording, no broadcast (for local ledger replay during startup) @@ -112,7 +118,7 @@ pub enum TransactionProcessingMode { /// Contains extra information not available in a standard /// execution, like compute units and return data. pub struct TransactionSimulationResult { - pub result: TransactionResult, + pub result: TransactionResult<()>, pub logs: Option>, pub units_consumed: u64, pub return_data: Option, @@ -150,7 +156,7 @@ pub trait SanitizeableTransaction { fn sanitize_with_encoded( self, verify: bool, - ) -> Result<(SanitizedTransaction, Option>), TransactionError> + ) -> TransactionResult<(SanitizedTransaction, Option)> where Self: Sized, { @@ -163,7 +169,7 @@ pub trait SanitizeableTransaction { /// Use for internally-constructed transactions that need encoded bytes. pub struct WithEncoded { pub txn: T, - pub encoded: Vec, + pub encoded: Bytes, } impl SanitizeableTransaction for WithEncoded { @@ -177,7 +183,7 @@ impl SanitizeableTransaction for WithEncoded { fn sanitize_with_encoded( self, verify: bool, - ) -> Result<(SanitizedTransaction, Option>), TransactionError> { + ) -> TransactionResult<(SanitizedTransaction, Option)> { let txn = self.txn.sanitize(verify)?; Ok((txn, Some(self.encoded))) } @@ -190,7 +196,8 @@ where T: Serialize, { let encoded = bincode::serialize(&txn) - .map_err(|_| TransactionError::SanitizeFailure)?; + .map_err(|_| TransactionError::SanitizeFailure)? + .into(); Ok(WithEncoded { txn, encoded }) } @@ -238,7 +245,7 @@ impl TransactionSchedulerHandle { pub async fn schedule( &self, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let mode = TransactionProcessingMode::Execution(None); let txn = ProcessableTransaction { @@ -246,7 +253,7 @@ impl TransactionSchedulerHandle { mode, encoded, }; - let r = self.0.send(txn).await; + let r = self.tx.send(txn).await; r.map_err(|_| TransactionError::ClusterMaintenance) } @@ -258,7 +265,7 @@ impl TransactionSchedulerHandle { pub async fn execute( &self, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let mode = |tx| TransactionProcessingMode::Execution(Some(tx)); self.send(txn, mode).await? } @@ -267,7 +274,7 @@ impl TransactionSchedulerHandle { pub async fn simulate( &self, txn: impl SanitizeableTransaction, - ) -> Result { + ) -> TransactionResult { let mode = TransactionProcessingMode::Simulation; self.send(txn, mode).await } @@ -285,7 +292,7 @@ impl TransactionSchedulerHandle { &self, position: ReplayPosition, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let mode = TransactionProcessingMode::Replay(position); let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let txn = ProcessableTransaction { @@ -293,7 +300,7 @@ impl TransactionSchedulerHandle { mode, encoded, }; - self.0 + self.tx .send(txn) .await .map_err(|_| TransactionError::ClusterMaintenance) @@ -305,7 +312,7 @@ impl TransactionSchedulerHandle { &self, txn: impl SanitizeableTransaction, mode: fn(oneshot::Sender) -> TransactionProcessingMode, - ) -> Result { + ) -> TransactionResult { let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let (tx, rx) = oneshot::channel(); let mode = mode(tx); @@ -314,12 +321,29 @@ impl TransactionSchedulerHandle { mode, encoded, }; - self.0 + self.tx .send(txn) .await .map_err(|_| TransactionError::ClusterMaintenance)?; rx.await.map_err(|_| TransactionError::ClusterMaintenance) } + + /// Waits for the scheduler to become idle and returns a permit that keeps it paused. + /// + /// This acquires the scheduling semaphore, blocking until the scheduler releases it + /// (which happens when all executors finish their current work and there are no + /// pending transactions). While holding the permit, the scheduler will not accept + /// or process new transactions. + /// + /// Use this to perform operations that require exclusive access to `AccountsDb`, + /// such as computing checksums or taking snapshots. + pub async fn wait_for_idle(&self) -> OwnedSemaphorePermit { + self.pause_permit + .clone() + .acquire_owned() + .await + .expect("scheduler semaphore can never be closed") + } } /// Scheduler execution mode (used in mode switching). diff --git a/magicblock-ledger/src/lib.rs b/magicblock-ledger/src/lib.rs index c9a2776dc..41e584baf 100644 --- a/magicblock-ledger/src/lib.rs +++ b/magicblock-ledger/src/lib.rs @@ -10,7 +10,7 @@ use solana_hash::Hash; pub use store::api::{Ledger, SignatureInfosForAddress}; use tokio::sync::broadcast; -#[derive(Default)] +#[derive(Default, Clone)] pub struct LatestBlockInner { pub slot: u64, pub blockhash: Hash, diff --git a/magicblock-magic-program-api/Cargo.toml b/magicblock-magic-program-api/Cargo.toml index 3e580380e..1df57c231 100644 --- a/magicblock-magic-program-api/Cargo.toml +++ b/magicblock-magic-program-api/Cargo.toml @@ -10,6 +10,6 @@ edition.workspace = true [dependencies] solana-program = ">=1.16, <3.0.0" -solana-signature = { workspace = true, default-features = false, features = ["serde"] } +solana-signature = { workspace = true, features = ["serde"] } bincode = "^1.3.3" serde = { version = "^1.0.228", features = ["derive"] } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index e9eb98e6b..8562f8f9d 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -1,10 +1,11 @@ use std::{ cmp::Ordering, + collections::BTreeMap, ops::Deref, sync::{Arc, RwLock}, }; -use magicblock_accounts_db::{AccountsDb, GlobalSyncLock}; +use magicblock_accounts_db::AccountsDb; use magicblock_core::{ link::{ accounts::AccountUpdateTx, @@ -16,7 +17,6 @@ use magicblock_core::{ Slot, }; use magicblock_ledger::{LatestBlock, LatestBlockInner, Ledger}; -use parking_lot::RwLockReadGuard; use solana_program::slot_hashes::SlotHashes; use solana_program_runtime::loaded_programs::{ BlockRelation, ForkGraph, ProgramCache, ProgramCacheEntry, @@ -30,13 +30,15 @@ use tokio::{ runtime::Builder, sync::mpsc::{Receiver, Sender}, }; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; use crate::{ builtins::BUILTINS, scheduler::{locks::ExecutorId, state::TransactionSchedulerState}, }; +const BLOCK_HISTORY_SIZE: usize = 32; + pub(crate) struct IndexedTransaction { pub(crate) slot: Slot, pub(crate) index: u32, @@ -58,7 +60,7 @@ pub(super) struct TransactionExecutor { accountsdb: Arc, ledger: Arc, block: LatestBlock, - sync: GlobalSyncLock, + block_history: BTreeMap, // SVM Components processor: TransactionBatchProcessor, @@ -103,14 +105,19 @@ impl TransactionExecutor { ..Default::default() }); + let block = state.ledger.latest_block(); + let initial_block = LatestBlockInner::clone(&*block.load()); + + let mut block_history = BTreeMap::new(); + block_history.insert(initial_block.slot, initial_block.clone()); + let this = Self { id, - sync: state.accountsdb.write_lock(), processor, accountsdb: state.accountsdb.clone(), ledger: state.ledger.clone(), config, - block: state.ledger.latest_block().clone(), + block: block.clone(), environment: state.environment.clone(), rx, ready_tx, @@ -119,6 +126,7 @@ impl TransactionExecutor { tasks_tx: state.tasks_tx.clone(), is_auto_airdrop_lamports_enabled: state .is_auto_airdrop_lamports_enabled, + block_history, }; this.processor.fill_missing_sysvar_cache_entries(&this); @@ -155,7 +163,6 @@ impl TransactionExecutor { #[allow(clippy::await_holding_lock)] #[instrument(skip(self), fields(executor_id = self.id))] async fn run(mut self) { - let mut guard = self.sync.read(); let mut block_updated = self.block.subscribe(); loop { @@ -163,6 +170,9 @@ impl TransactionExecutor { biased; txn = self.rx.recv() => { let Some(transaction) = txn else { break }; + if transaction.slot != self.processor.slot { + self.transition_to_slot(transaction.slot); + } match transaction.txn.mode { TransactionProcessingMode::Execution(_) => { self.execute(transaction, None); @@ -177,10 +187,7 @@ impl TransactionExecutor { let _ = self.ready_tx.try_send(self.id); } _ = block_updated.recv() => { - // Unlock to allow global ops (snapshots), then update slot - RwLockReadGuard::unlock_fair(guard); - self.transition_to_new_slot(); - guard = self.sync.read(); + self.register_new_block(); } else => break, } @@ -188,11 +195,23 @@ impl TransactionExecutor { info!("Executor terminated"); } - fn transition_to_new_slot(&mut self) { - let block = self.block.load(); + fn register_new_block(&mut self) { + let block = LatestBlockInner::clone(&*self.block.load()); + while self.block_history.len() >= BLOCK_HISTORY_SIZE { + self.block_history.pop_first(); + } + self.block_history.insert(block.slot, block); + } + + fn transition_to_slot(&mut self, slot: Slot) { + let Some(block) = self.block_history.get(&slot) else { + // should never happen in practice + warn!(slot, "tried to transition to slot which wasn't registered"); + return; + }; self.environment.blockhash = block.blockhash; self.processor.slot = block.slot; - self.set_sysvars(&block); + self.set_sysvars(block); } /// Updates cache and persists slot hashes. diff --git a/magicblock-processor/src/executor/processing.rs b/magicblock-processor/src/executor/processing.rs index 5520a06db..b82864bb2 100644 --- a/magicblock-processor/src/executor/processing.rs +++ b/magicblock-processor/src/executor/processing.rs @@ -284,6 +284,7 @@ impl super::TransactionExecutor { let versioned = transaction.to_versioned_transaction(); bincode::serialize(&versioned) .map_err(|e| Box::new(e) as Box)? + .into() } }; diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index 43420bdc5..c2b4a2437 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -314,6 +314,20 @@ impl ExecutionCoordinator { ); !mode_mismatch } + + /// Check whether the node is acting as an event source for replication + pub(super) fn should_replicate(&self) -> bool { + matches!(self.mode, CoordinationMode::Primary(_)) + } + + /// Returns true when no transactions are actively executing. + /// + /// Idle means: all executors are either ready or blocked, so no state + /// transitions are in progress. This is the safe moment for external + /// operations like checksums to access AccountsDb. + pub(super) fn is_idle(&self) -> bool { + self.ready_executors.len() == self.blocked_transactions.len() + } } /// Transaction wrapped with a monotonic ID for FIFO queue ordering. diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs index 69cd80d5d..1fd4a917b 100644 --- a/magicblock-processor/src/scheduler/mod.rs +++ b/magicblock-processor/src/scheduler/mod.rs @@ -16,9 +16,12 @@ use coordinator::{ExecutionCoordinator, TransactionWithId}; use locks::{ExecutorId, MAX_SVM_EXECUTORS}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::{ - link::transactions::{ - ProcessableTransaction, SchedulerMode, TransactionProcessingMode, - TransactionToProcessRx, + link::{ + replication::{self, Message}, + transactions::{ + ProcessableTransaction, SchedulerMode, TransactionProcessingMode, + TransactionToProcessRx, + }, }, Slot, }; @@ -30,7 +33,10 @@ use solana_sdk_ids::sysvar::{clock, slot_hashes}; use state::TransactionSchedulerState; use tokio::{ runtime::Builder, - sync::mpsc::{channel, Receiver, Sender}, + sync::{ + mpsc::{channel, Receiver, Sender}, + OwnedSemaphorePermit, Semaphore, + }, }; use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, warn}; @@ -64,7 +70,14 @@ pub struct TransactionScheduler { shutdown: CancellationToken, /// Receives mode transition commands (Primary or Replica) at runtime. mode_rx: Receiver, + /// A sink for the events (transactions, blocks etc) that need to be replicated + replication_tx: Sender, + /// Semaphore for coordinating exclusive DB access with external callers. + /// Scheduler acquires permit when scheduling, releases when idle. + pause_permit: Arc, + /// Current Slot that scheduler is operating on (clock value) slot: Slot, + /// Current transaction index included into the block under assembly index: u32, } @@ -107,6 +120,8 @@ impl TransactionScheduler { accountsdb: state.accountsdb, shutdown: state.shutdown, mode_rx: state.mode_rx, + replication_tx: state.replication_tx, + pause_permit: state.pause_permit, slot: state.ledger.latest_block().load().slot, index: 0, } @@ -118,6 +133,7 @@ impl TransactionScheduler { // Single-threaded runtime avoids scheduler contention with other async tasks let runtime = Builder::new_current_thread() .thread_name("transaction-scheduler") + .enable_all() .build() .expect("Failed to build single-threaded Tokio runtime"); runtime.block_on(tokio::task::unconstrained(self.run())); @@ -134,11 +150,20 @@ impl TransactionScheduler { #[instrument(skip(self))] async fn run(mut self) { let mut block_produced = self.latest_block.subscribe(); + // Holds the scheduling permit while transactions are being processed. + // Released when idle so external callers can acquire exclusive access. + let mut scheduling_permit = None; loop { tokio::select! { biased; Ok(()) = block_produced.recv() => self.transition_to_new_slot(), - Some(executor) = self.ready_rx.recv() => self.handle_ready_executor(executor), + Some(executor) = self.ready_rx.recv() => { + self.handle_ready_executor(executor).await; + // Release permit when idle: no active work, safe for external access + if self.coordinator.is_idle() { + scheduling_permit.take(); + } + } Some(mode) = self.mode_rx.recv() => { match mode { SchedulerMode::Primary => { @@ -150,7 +175,7 @@ impl TransactionScheduler { } } Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { - self.handle_new_transaction(txn); + self.handle_new_transaction(txn, &mut scheduling_permit).await; } _ = self.shutdown.cancelled() => break, else => break, @@ -163,26 +188,42 @@ impl TransactionScheduler { info!("Scheduler terminated"); } - fn handle_ready_executor(&mut self, executor: ExecutorId) { + async fn handle_ready_executor(&mut self, executor: ExecutorId) { self.coordinator.unlock_accounts(executor); - self.reschedule_blocked_transactions(executor); + self.reschedule_blocked_transactions(executor).await; } - fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { + async fn handle_new_transaction( + &mut self, + txn: ProcessableTransaction, + scheduling_permit: &mut Option, + ) { if !self.coordinator.is_transaction_allowed(&txn.mode) { warn!("Dropping transaction due to mode incompatibility"); return; } + // Acquire permit if not already held. This blocks if an external caller + // (e.g., checksum) currently holds it, ensuring mutual exclusion. + if scheduling_permit.is_none() { + let permit = self + .pause_permit + .clone() + .acquire_owned() + .await + .expect("scheduler semaphore can never be closed"); + scheduling_permit.replace(permit); + } // SAFETY: // the caller ensured that executor was ready before invoking this // method so the get_ready_executor should always return Some here let executor = self.coordinator.get_ready_executor().expect( "unreachable: is_ready() guard ensures an executor is available", ); - self.schedule_transaction(executor, TransactionWithId::new(txn)); + self.schedule_transaction(executor, TransactionWithId::new(txn)) + .await; } - fn reschedule_blocked_transactions(&mut self, blocker: ExecutorId) { + async fn reschedule_blocked_transactions(&mut self, blocker: ExecutorId) { let mut executor = Some(blocker); while let Some(exec) = executor.take() { // Try to get next transaction blocked by this executor @@ -193,7 +234,7 @@ impl TransactionScheduler { break; }; - let blocked = self.schedule_transaction(exec, txn); + let blocked = self.schedule_transaction(exec, txn).await; // If blocked by the same executor we're draining, stop to avoid infinite loop if blocked.is_some_and(|b| b == blocker) { @@ -204,7 +245,7 @@ impl TransactionScheduler { } } - fn schedule_transaction( + async fn schedule_transaction( &mut self, executor: ExecutorId, txn: TransactionWithId, @@ -222,11 +263,31 @@ impl TransactionScheduler { (self.slot, index) }; + let msg = txn + .encoded + .as_ref() + .cloned() + .filter(|_| { + matches!(txn.mode, TransactionProcessingMode::Execution(_)) + && self.coordinator.should_replicate() + }) + .map(|payload| { + Message::Transaction(replication::Transaction { + index, + slot, + payload, + }) + }); let txn = IndexedTransaction { slot, index, txn }; - let _ = self.executors[executor as usize].try_send(txn).inspect_err( - |e| error!(executor, error = ?e, "Executor channel send failed"), - ); + let sent = self.executors[executor as usize].try_send(txn).inspect_err( + |error| error!(executor, %error, "Executor channel send failed"), + ).is_ok(); + if let Some(msg) = msg.filter(|_| sent) { + let _ = self.replication_tx.send(msg).await.inspect_err( + |error| error!(executor, %error, "Replication channel send failed"), + ); + } None } diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index 03b9a01af..1cbd4e675 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -9,6 +9,7 @@ use std::sync::{Arc, OnceLock, RwLock}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::link::{ accounts::AccountUpdateTx, + replication::Message, transactions::{ ScheduledTasksTx, SchedulerMode, TransactionStatusTx, TransactionToProcessRx, @@ -32,7 +33,10 @@ use solana_program_runtime::{ }; use solana_pubkey::Pubkey; use solana_svm::transaction_processor::TransactionProcessingEnvironment; -use tokio::sync::mpsc::Receiver; +use tokio::sync::{ + mpsc::{Receiver, Sender}, + Semaphore, +}; use tokio_util::sync::CancellationToken; use crate::{executor::SimpleForkGraph, syscalls::SyscallMatmulI8}; @@ -52,6 +56,9 @@ pub struct TransactionSchedulerState { pub account_update_tx: AccountUpdateTx, pub transaction_status_tx: TransactionStatusTx, pub tasks_tx: ScheduledTasksTx, + pub replication_tx: Sender, + /// Semaphore for pausing scheduling during exclusive DB access. + pub pause_permit: Arc, // === Configuration === pub is_auto_airdrop_lamports_enabled: bool, diff --git a/magicblock-processor/tests/scheduling.rs b/magicblock-processor/tests/scheduling.rs index cb340c63f..633ac34ed 100644 --- a/magicblock-processor/tests/scheduling.rs +++ b/magicblock-processor/tests/scheduling.rs @@ -473,3 +473,65 @@ async fn test_serial_transfer_chain() { async fn test_large_queue_mixed_8_executors() { scenario_stress_test(8).await; } + +/// Tests that wait_for_idle() blocks scheduling while held, and releases when idle. +/// +/// Flow: +/// 1. Acquire permit before scheduler starts +/// 2. Queue transaction +/// 3. Start scheduler - it blocks waiting for permit +/// 4. Verify transaction doesn't execute for 500ms +/// 5. Release permit - transaction executes +/// 6. Reacquire permit - succeeds quickly (scheduler is idle) +#[tokio::test] +async fn test_wait_for_idle_coordination() { + let mut env = ExecutionTestEnv::new_with_config( + ExecutionTestEnv::BASE_FEE, + 1, + true, // defer_startup + ); + + // 1. Acquire permit before scheduler starts (owned to avoid borrow issues) + let permit = env.transaction_scheduler.wait_for_idle().await; + + // 2. Queue a transaction + let accounts = create_accounts(&mut env, 2); + let tx = tx_transfer(&mut env, accounts[0], accounts[1]); + let sig = tx.signatures[0]; + env.schedule_transaction(tx).await; + + // 3. Start scheduler + env.run_scheduler(); + env.advance_slot(); + + // 4. Transaction should NOT execute while we hold the permit + tokio::time::sleep(Duration::from_millis(500)).await; + assert!( + env.get_transaction(sig).is_none(), + "transaction should not execute while permit is held" + ); + + // 5. Release permit - scheduler should now process + drop(permit); + + // Wait for transaction to complete + let status = timeout(TIMEOUT, async { + loop { + match env.dispatch.transaction_status.recv_async().await { + Ok(s) if s.txn.signatures()[0] == sig => break s, + _ => continue, + } + } + }) + .await + .expect("transaction should complete after permit released"); + assert!(status.meta.status.is_ok(), "transaction should succeed"); + + // 6. Reacquire permit - should succeed quickly (scheduler is idle) + let _permit = timeout( + Duration::from_millis(100), + env.transaction_scheduler.wait_for_idle(), + ) + .await + .expect("should acquire permit quickly when scheduler is idle"); +} diff --git a/magicblock-replicator/Cargo.toml b/magicblock-replicator/Cargo.toml index 12457e7fc..4d5328fd8 100644 --- a/magicblock-replicator/Cargo.toml +++ b/magicblock-replicator/Cargo.toml @@ -26,6 +26,7 @@ tokio = { workspace = true, features = [ "io-util", "fs", ] } +tokio-util = { workspace = true } serde = { workspace = true, features = ["derive"] } solana-hash = { workspace = true, features = ["serde"] } solana-transaction = { workspace = true, features = ["serde"] } diff --git a/magicblock-replicator/src/lib.rs b/magicblock-replicator/src/lib.rs index 9a616b463..243658a2b 100644 --- a/magicblock-replicator/src/lib.rs +++ b/magicblock-replicator/src/lib.rs @@ -14,7 +14,6 @@ pub mod error; pub mod nats; -pub mod proto; pub mod service; pub mod watcher; @@ -22,4 +21,4 @@ pub mod watcher; mod tests; pub use error::{Error, Result}; -pub use proto::{Message, TransactionIndex}; +pub use service::Service as ReplicationService; diff --git a/magicblock-replicator/src/nats/consumer.rs b/magicblock-replicator/src/nats/consumer.rs index 47a424f2c..02c05b1c0 100644 --- a/magicblock-replicator/src/nats/consumer.rs +++ b/magicblock-replicator/src/nats/consumer.rs @@ -4,6 +4,7 @@ use async_nats::jetstream::consumer::{ pull::{Config as PullConfig, Stream as MessageStream}, AckPolicy, DeliverPolicy, PullConsumer, }; +use tokio_util::sync::CancellationToken; use tracing::warn; use super::cfg; @@ -58,18 +59,27 @@ impl Consumer { /// /// Use this in a `tokio::select!` loop to process messages as they arrive. /// Messages are fetched in batches for efficiency. - pub async fn messages(&self) -> MessageStream { + pub async fn messages( + &self, + cancel: &CancellationToken, + ) -> Option { loop { - let result = self + let messages = self .inner .stream() .max_messages_per_batch(cfg::BATCH_SIZE) - .messages() - .await; - match result { - Ok(s) => break s, - Err(error) => { - warn!(%error, "failed to create message stream") + .messages(); + tokio::select! { + result = messages => { + match result { + Ok(s) => break Some(s), + Err(error) => { + warn!(%error, "failed to create message stream") + } + } + } + _ = cancel.cancelled() => { + break None; } } } diff --git a/magicblock-replicator/src/nats/lock_watcher.rs b/magicblock-replicator/src/nats/lock_watcher.rs index 303a84926..7f58fb726 100644 --- a/magicblock-replicator/src/nats/lock_watcher.rs +++ b/magicblock-replicator/src/nats/lock_watcher.rs @@ -2,6 +2,7 @@ use async_nats::jetstream::kv::{Operation, Watch}; use futures::StreamExt; +use tokio_util::sync::CancellationToken; use tracing::warn; use super::cfg; @@ -17,8 +18,14 @@ pub struct LockWatcher { impl LockWatcher { /// Creates a new lock watcher. - pub(crate) async fn new(broker: &Broker) -> Self { + pub(crate) async fn new( + broker: &Broker, + cancel: &CancellationToken, + ) -> Option { let watch = loop { + if cancel.is_cancelled() { + return None; + } let store = match broker.ctx.get_key_value(cfg::PRODUCER_LOCK).await { Ok(s) => s, @@ -35,7 +42,7 @@ impl LockWatcher { } } }; - Self { watch } + Some(Self { watch }) } /// Waits for the lock to be deleted or expire. diff --git a/magicblock-replicator/src/nats/mod.rs b/magicblock-replicator/src/nats/mod.rs index 70cc0f511..0c9386a76 100644 --- a/magicblock-replicator/src/nats/mod.rs +++ b/magicblock-replicator/src/nats/mod.rs @@ -18,6 +18,7 @@ use async_nats::Subject; pub use broker::Broker; pub use consumer::Consumer; pub use lock_watcher::LockWatcher; +use magicblock_core::link::replication::Message; pub use producer::Producer; pub use snapshot::Snapshot; @@ -97,4 +98,12 @@ impl Subjects { pub fn superblock() -> Subject { Self::from(Self::SUPERBLOCK) } + + pub(crate) fn from_message(msg: &Message) -> Subject { + match msg { + Message::Transaction(_) => Subjects::transaction(), + Message::Block(_) => Subjects::block(), + Message::SuperBlock(_) => Subjects::superblock(), + } + } } diff --git a/magicblock-replicator/src/service/context.rs b/magicblock-replicator/src/service/context.rs index fcd16d0f7..a9f253a57 100644 --- a/magicblock-replicator/src/service/context.rs +++ b/magicblock-replicator/src/service/context.rs @@ -5,22 +5,25 @@ use std::sync::Arc; use machineid_rs::IdBuilder; use magicblock_accounts_db::AccountsDb; use magicblock_core::{ - link::transactions::{SchedulerMode, TransactionSchedulerHandle}, - Slot, + link::{ + replication::{Block, Message, SuperBlock}, + transactions::{SchedulerMode, TransactionSchedulerHandle}, + }, + Slot, TransactionIndex, }; use magicblock_ledger::Ledger; use tokio::{ fs::File, sync::mpsc::{Receiver, Sender}, }; +use tokio_util::sync::CancellationToken; use tracing::info; use super::{Primary, Standby, CONSUMER_RETRY_DELAY}; use crate::{ nats::{Broker, Consumer, LockWatcher, Producer}, - proto::{self, TransactionIndex}, watcher::SnapshotWatcher, - Error, Message, Result, + Error, Result, }; /// Shared state for both primary and standby roles. @@ -29,6 +32,8 @@ pub struct ReplicationContext { pub id: String, /// NATS broker. pub broker: Broker, + /// Global shutdown signal + pub cancel: CancellationToken, /// Scheduler mode channel. pub mode_tx: Sender, /// Accounts database. @@ -39,7 +44,10 @@ pub struct ReplicationContext { pub scheduler: TransactionSchedulerHandle, /// Current position. pub slot: Slot, + /// Position of the last transaction within slot pub index: TransactionIndex, + /// Whether this node can promote from standby to primary. + pub can_promote: bool, } impl ReplicationContext { @@ -50,6 +58,8 @@ impl ReplicationContext { accountsdb: Arc, ledger: Arc, scheduler: TransactionSchedulerHandle, + cancel: CancellationToken, + can_promote: bool, ) -> Result { let id = IdBuilder::new(machineid_rs::Encryption::SHA256) .add_component(machineid_rs::HWIDComponent::SystemID) @@ -60,16 +70,18 @@ impl ReplicationContext { .get_latest_transaction_position()? .unwrap_or_default(); - info!(%id, slot, index, "context initialized"); + info!(%id, slot, index, can_promote, "context initialized"); Ok(Self { id, broker, + cancel, mode_tx, accountsdb, ledger, scheduler, slot, index, + can_promote, }) } @@ -80,17 +92,16 @@ impl ReplicationContext { } /// Writes block to ledger. - pub async fn write_block(&self, block: &proto::Block) -> Result<()> { + pub async fn write_block(&self, block: &Block) -> Result<()> { self.ledger .write_block(block.slot, block.timestamp, block.hash)?; Ok(()) } /// Verifies superblock checksum. - pub fn verify_checksum(&self, sb: &proto::SuperBlock) -> Result<()> { - let _lock = self.accountsdb.lock_database(); - // SAFETY: Lock acquired above ensures no concurrent modifications - // during checksum computation. + pub async fn verify_checksum(&self, sb: &SuperBlock) -> Result<()> { + let _guard = self.scheduler.wait_for_idle().await; + // SAFETY: Scheduler is paused, no concurrent modifications during checksum. let checksum = unsafe { self.accountsdb.checksum() }; if checksum == sb.checksum { Ok(()) @@ -132,16 +143,25 @@ impl ReplicationContext { self.broker.put_snapshot(slot, file).await } - /// Creates consumer with retry. - pub async fn create_consumer(&self, reset: bool) -> Consumer { + /// Creates consumer with retry, respecting shutdown signal. + /// Returns `None` if shutdown is triggered during creation. + pub async fn create_consumer(&self, reset: bool) -> Option { loop { - match self.broker.create_consumer(&self.id, reset).await { - Ok(c) => return c, - Err(e) => { - tracing::warn!(%e, "consumer creation failed, retrying"); - tokio::time::sleep(CONSUMER_RETRY_DELAY).await; + tokio::select! { + result = self.broker.create_consumer(&self.id, reset) => { + match result { + Ok(c) => return Some(c), + Err(e) => { + tracing::warn!(%e, "consumer creation failed, retrying"); + } + } + } + _ = self.cancel.cancelled() => { + tracing::info!("shutdown during consumer creation"); + return None; } } + tokio::time::sleep(CONSUMER_RETRY_DELAY).await; } } @@ -157,6 +177,7 @@ impl ReplicationContext { } /// Transitions to standby role. + /// Returns `None` if shutdown is triggered during consumer creation. /// reset parameter controls where in the stream the consumption starts: /// true - the last known position that we know /// false - the last known position that message broker tracks for us @@ -164,10 +185,20 @@ impl ReplicationContext { self, messages: Receiver, reset: bool, - ) -> Result { - let consumer = Box::new(self.create_consumer(reset).await); - let watcher = LockWatcher::new(&self.broker).await; + ) -> Result> { + let Some(consumer) = self.create_consumer(reset).await else { + return Ok(None); + }; + let Some(watcher) = LockWatcher::new(&self.broker, &self.cancel).await + else { + return Ok(None); + }; self.enter_replica_mode().await; - Ok(Standby::new(self, consumer, messages, watcher)) + Ok(Some(Standby::new( + self, + Box::new(consumer), + messages, + watcher, + ))) } } diff --git a/magicblock-replicator/src/service/mod.rs b/magicblock-replicator/src/service/mod.rs index 4c89ae5ad..ee3af9d6b 100644 --- a/magicblock-replicator/src/service/mod.rs +++ b/magicblock-replicator/src/service/mod.rs @@ -27,8 +27,9 @@ use std::{sync::Arc, thread::JoinHandle, time::Duration}; pub use context::ReplicationContext; use magicblock_accounts_db::AccountsDb; -use magicblock_core::link::transactions::{ - SchedulerMode, TransactionSchedulerHandle, +use magicblock_core::link::{ + replication::Message, + transactions::{SchedulerMode, TransactionSchedulerHandle}, }; use magicblock_ledger::Ledger; pub use primary::Primary; @@ -37,8 +38,9 @@ use tokio::{ runtime::Builder, sync::mpsc::{Receiver, Sender}, }; +use tokio_util::sync::CancellationToken; -use crate::{nats::Broker, Message, Result}; +use crate::{nats::Broker, Result}; // ============================================================================= // Constants @@ -59,7 +61,11 @@ pub enum Service { } impl Service { - /// Creates service, attempting primary role first. + /// Creates service, attempting primary role first if allowed. + /// + /// When `can_promote` is false (ReplicaOnly mode), skips lock acquisition + /// and goes directly to standby mode. + #[allow(clippy::too_many_arguments)] pub async fn new( broker: Broker, mode_tx: Sender, @@ -67,22 +73,44 @@ impl Service { ledger: Arc, scheduler: TransactionSchedulerHandle, messages: Receiver, + cancel: CancellationToken, reset: bool, - ) -> crate::Result { + can_promote: bool, + ) -> crate::Result> { let ctx = ReplicationContext::new( - broker, mode_tx, accountsdb, ledger, scheduler, + broker, + mode_tx, + accountsdb, + ledger, + scheduler, + cancel, + can_promote, ) .await?; - // Try to become primary. - match ctx.try_acquire_producer().await? { - Some(producer) => { - Ok(Self::Primary(ctx.into_primary(producer, messages).await?)) - } - None => { - let standby = ctx.into_standby(messages, reset).await?; - Ok(Self::Standby(standby)) + // Try to become primary only if promotion is allowed. + if can_promote { + match ctx.try_acquire_producer().await? { + Some(producer) => Ok(Some(Self::Primary( + ctx.into_primary(producer, messages).await?, + ))), + None => { + let Some(standby) = + ctx.into_standby(messages, reset).await? + else { + // Shutdown during consumer creation + return Ok(None); + }; + Ok(Some(Self::Standby(standby))) + } } + } else { + // ReplicaOnly mode: skip lock acquisition, go directly to standby + let Some(standby) = ctx.into_standby(messages, reset).await? else { + // Shutdown during consumer creation + return Ok(None); + }; + Ok(Some(Self::Standby(standby))) } } @@ -90,13 +118,13 @@ impl Service { pub async fn run(mut self) -> Result<()> { loop { self = match self { - Service::Primary(p) => Service::Standby(p.run().await?), - Service::Standby(s) => match s.run().await { - Ok(p) => Service::Primary(p), - Err(error) => { - tracing::error!(%error, "unrecoverable replication failure"); - return Err(error); - } + Service::Primary(p) => match p.run().await? { + Some(s) => Service::Standby(s), + None => return Ok(()), + }, + Service::Standby(s) => match s.run().await? { + Some(p) => Service::Primary(p), + None => return Ok(()), }, }; } @@ -108,6 +136,7 @@ impl Service { pub fn spawn(self) -> JoinHandle> { std::thread::spawn(move || { let runtime = Builder::new_current_thread() + .enable_all() .thread_name("replication-service") .build() .expect("Failed to build replication service runtime"); diff --git a/magicblock-replicator/src/service/primary.rs b/magicblock-replicator/src/service/primary.rs index c2b00a161..5733fe9b5 100644 --- a/magicblock-replicator/src/service/primary.rs +++ b/magicblock-replicator/src/service/primary.rs @@ -1,11 +1,15 @@ //! Primary node: publishes events and holds leader lock. +use magicblock_core::link::replication::Message; use tokio::sync::mpsc::Receiver; use tracing::{error, info, instrument, warn}; use super::{ReplicationContext, LOCK_REFRESH_INTERVAL}; use crate::{ - nats::Producer, service::Standby, watcher::SnapshotWatcher, Message, Result, + nats::{Producer, Subjects}, + service::Standby, + watcher::SnapshotWatcher, + Result, }; /// Primary node: publishes events and holds leader lock. @@ -32,22 +36,15 @@ impl Primary { } } - /// Runs until leadership lost, returns standby on demotion. + /// Runs until leadership lost or shutdown. + /// Returns `Some(Standby)` on demotion, `None` on shutdown. #[instrument(skip(self))] - pub async fn run(mut self) -> Result { + pub async fn run(mut self) -> Result> { let mut lock_tick = tokio::time::interval(LOCK_REFRESH_INTERVAL); loop { tokio::select! { - Some(msg) = self.messages.recv() => { - if let Err(error) = self.publish(msg).await { - // publish should not easily fail, if that happens, it means - // the message broker has become unrecoverably unreacheable - warn!(%error, "failed to publish the message"); - return self.ctx.into_standby(self.messages, true).await; - } - } - + biased; _ = lock_tick.tick() => { let held = match self.producer.refresh().await { Ok(h) => h, @@ -61,12 +58,23 @@ impl Primary { return self.ctx.into_standby(self.messages, true).await; } } - + Some(msg) = self.messages.recv() => { + if let Err(error) = self.publish(msg).await { + // publish should not easily fail, if that happens, it means + // the message broker has become unrecoverably unreacheable + warn!(%error, "failed to publish the message"); + return self.ctx.into_standby(self.messages, true).await; + } + } Some((file, slot)) = self.snapshots.recv() => { if let Err(e) = self.ctx.upload_snapshot(file, slot).await { warn!(%e, "snapshot upload failed"); } } + _ = self.ctx.cancel.cancelled() => { + info!("shutdown received, terminating primary mode"); + return Ok(None); + } } } } @@ -79,7 +87,7 @@ impl Primary { return Ok(()); } }; - let subject = msg.subject(); + let subject = Subjects::from_message(&msg); let (slot, index) = msg.slot_and_index(); let ack = matches!(msg, Message::SuperBlock(_)); diff --git a/magicblock-replicator/src/service/standby.rs b/magicblock-replicator/src/service/standby.rs index 9bd834d1b..c38357efc 100644 --- a/magicblock-replicator/src/service/standby.rs +++ b/magicblock-replicator/src/service/standby.rs @@ -4,9 +4,9 @@ use std::time::{Duration, Instant}; use async_nats::Message as NatsMessage; use futures::StreamExt; -use magicblock_core::{ - link::transactions::{ReplayPosition, WithEncoded}, - Slot, +use magicblock_core::link::{ + replication::{Message, Transaction}, + transactions::{ReplayPosition, WithEncoded}, }; use solana_transaction::versioned::VersionedTransaction; use tokio::sync::mpsc::Receiver; @@ -15,9 +15,8 @@ use tracing::{error, info, warn}; use super::{ReplicationContext, LEADER_TIMEOUT}; use crate::{ nats::{Consumer, LockWatcher}, - proto::TransactionIndex, service::Primary, - Message, Result, + Result, }; /// Standby node: consumes events and watches for leader failure. @@ -27,6 +26,7 @@ pub struct Standby { messages: Receiver, watcher: LockWatcher, last_activity: Instant, + can_promote: bool, } impl Standby { @@ -37,26 +37,48 @@ impl Standby { messages: Receiver, watcher: LockWatcher, ) -> Self { + let can_promote = ctx.can_promote; Self { ctx, consumer, messages, watcher, last_activity: Instant::now(), + can_promote, } } - /// Runs until leadership acquired, returns primary on promotion. - pub async fn run(mut self) -> Result { + /// Runs until leadership acquired or shutdown. + /// Returns `Some(Primary)` on promotion, `None` on shutdown. + pub async fn run(mut self) -> Result> { let mut timeout_check = tokio::time::interval(Duration::from_secs(1)); - let mut stream = self.consumer.messages().await; + let Some(mut stream) = self.consumer.messages(&self.ctx.cancel).await + else { + return Ok(None); + }; loop { tokio::select! { + biased; + _ = self.watcher.wait_for_expiry() => { + if self.can_promote { + info!("leader lock expired, attempting takeover"); + if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { + info!("acquired leadership, promoting"); + return self.ctx.into_primary(producer, self.messages).await.map(Some); + } + } else { + warn!("leader lock expired, but takeover disabled (ReplicaOnly mode)"); + } + } result = stream.next() => { let Some(result) = result else { - stream = self.consumer.messages().await; - continue; + if let Some(s) = self.consumer.messages(&self.ctx.cancel).await { + stream = s; + continue; + } else { + return Ok(None); + }; }; match result { Ok(msg) => { @@ -66,21 +88,20 @@ impl Standby { Err(e) => warn!(%e, "message consumption stream error"), } } - - _ = self.watcher.wait_for_expiry() => { - info!("leader lock expired, attempting takeover"); - if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { - info!("acquired leadership, promoting"); - return self.ctx.into_primary(producer, self.messages).await; - } - } - _ = timeout_check.tick(), if self.last_activity.elapsed() > LEADER_TIMEOUT => { - if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { - info!("acquired leadership via timeout, promoting"); - return self.ctx.into_primary(producer, self.messages).await; + if self.can_promote { + if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { + info!("acquired leadership via timeout, promoting"); + return self.ctx.into_primary(producer, self.messages).await.map(Some); + } + } else { + warn!("leader timeout reached, but takeover disabled (ReplicaOnly mode)"); } } + _ = self.ctx.cancel.cancelled() => { + info!("shutdown received, terminating standby mode"); + return Ok(None); + } } } } @@ -102,12 +123,12 @@ impl Standby { } let result = match message { - Message::Transaction(tx) => { - self.replay_tx(tx.slot, tx.index, tx.payload).await + Message::Transaction(txn) => { + self.replay_tx(txn).await } Message::Block(block) => self.ctx.write_block(&block).await, Message::SuperBlock(sb) => { - self.ctx.verify_checksum(&sb).inspect_err(|error| + self.ctx.verify_checksum(&sb).await.inspect_err(|error| error!(slot, %error, "accountsdb state has diverged") ) } @@ -120,20 +141,16 @@ impl Standby { self.ctx.update_position(slot, index); } - async fn replay_tx( - &self, - slot: Slot, - index: TransactionIndex, - encoded: Vec, - ) -> Result<()> { + async fn replay_tx(&self, msg: Transaction) -> Result<()> { let pos = ReplayPosition { - slot, - index, + slot: msg.slot, + index: msg.index, persist: true, }; - let tx: VersionedTransaction = bincode::deserialize(&encoded)?; - let tx = WithEncoded { txn: tx, encoded }; - self.ctx.scheduler.replay(pos, tx).await?; + let encoded = msg.payload; + let txn: VersionedTransaction = bincode::deserialize(&encoded)?; + let txn = WithEncoded { txn, encoded }; + self.ctx.scheduler.replay(pos, txn).await?; Ok(()) } } diff --git a/programs/magicblock/src/validator.rs b/programs/magicblock/src/validator.rs index 4a4f6e572..919d5ce8f 100644 --- a/programs/magicblock/src/validator.rs +++ b/programs/magicblock/src/validator.rs @@ -1,72 +1,30 @@ -use std::sync::RwLock; +use std::sync::OnceLock; -use lazy_static::lazy_static; use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_signer::Signer; -lazy_static! { - static ref VALIDATOR_AUTHORITY: RwLock> = RwLock::new(None); - static ref VALIDATOR_AUTHORITY_OVERRIDE: RwLock> = - RwLock::new(None); -} +static VALIDATOR_AUTHORITY: OnceLock = OnceLock::new(); +static VALIDATOR_AUTHORITY_OVERRIDE: OnceLock = OnceLock::new(); pub fn validator_authority() -> Keypair { - VALIDATOR_AUTHORITY - .read() - .expect("RwLock VALIDATOR_AUTHORITY poisoned") - .as_ref() - .expect("Validator authority needs to be set on startup") - .insecure_clone() + VALIDATOR_AUTHORITY.wait().insecure_clone() } pub fn validator_authority_id() -> Pubkey { - VALIDATOR_AUTHORITY - .read() - .expect("RwLock VALIDATOR_AUTHORITY poisoned") - .as_ref() - .map(|x| x.pubkey()) - .expect("Validator authority needs to be set on startup") + VALIDATOR_AUTHORITY.wait().pubkey() } pub fn init_validator_authority(keypair: Keypair) { - let mut validator_authority_lock = VALIDATOR_AUTHORITY - .write() - .expect("RwLock VALIDATOR_AUTHORITY poisoned"); - if let Some(validator_authority) = validator_authority_lock.as_ref() { - panic!("Validator authority can only be set once, but was set before to '{}'", validator_authority.pubkey()); - } - validator_authority_lock.replace(keypair); -} - -pub fn init_validator_authority_if_needed(keypair: Keypair) { - let mut validator_authority_lock = VALIDATOR_AUTHORITY - .write() - .expect("RwLock VALIDATOR_AUTHORITY poisoned"); - if validator_authority_lock.as_ref().is_some() { - return; - } - validator_authority_lock.replace(keypair); + let _ = VALIDATOR_AUTHORITY.set(keypair); } pub fn set_validator_authority_override(pubkey: Pubkey) { - let mut lock = VALIDATOR_AUTHORITY_OVERRIDE - .write() - .expect("RwLock VALIDATOR_AUTHORITY_OVERRIDE poisoned"); - lock.replace(pubkey); -} - -pub fn unset_validator_authority_override() { - let mut lock = VALIDATOR_AUTHORITY_OVERRIDE - .write() - .expect("RwLock VALIDATOR_AUTHORITY_OVERRIDE poisoned"); - *lock = None; + let _ = VALIDATOR_AUTHORITY_OVERRIDE.set(pubkey); } pub fn validator_authority_override() -> Option { - *VALIDATOR_AUTHORITY_OVERRIDE - .read() - .expect("RwLock VALIDATOR_AUTHORITY_OVERRIDE poisoned") + VALIDATOR_AUTHORITY_OVERRIDE.get().copied() } pub fn effective_validator_authority_id() -> Pubkey { @@ -74,11 +32,5 @@ pub fn effective_validator_authority_id() -> Pubkey { } pub fn generate_validator_authority_if_needed() { - let mut validator_authority_lock = VALIDATOR_AUTHORITY - .write() - .expect("RwLock VALIDATOR_AUTHORITY poisoned"); - if validator_authority_lock.as_ref().is_some() { - return; - } - validator_authority_lock.replace(Keypair::new()); + VALIDATOR_AUTHORITY.get_or_init(Keypair::new); } diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 76c49d75a..fae223b81 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -453,6 +453,43 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-nats" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5af9ebfb0a14481d3eaf6101e6391261e4f30d25b26a7635ade8a39482ded0" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "once_cell", + "pin-project", + "portable-atomic", + "rand 0.8.5", + "regex", + "ring", + "rustls-native-certs 0.7.3", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls 0.26.4", + "tokio-stream", + "tokio-util 0.7.17", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -910,6 +947,9 @@ name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +dependencies = [ + "serde", +] [[package]] name = "bzip2" @@ -1189,6 +1229,12 @@ dependencies = [ "sha2-const-stable", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "constant_time_eq" version = "0.3.1" @@ -1463,6 +1509,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "der" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "der-parser" version = "8.2.0" @@ -1647,7 +1704,16 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91cff35c70bba8a626e3185d8cd48cc11b5437e1a5bcd15b9b5fa3c64b6dfee7" dependencies = [ - "signature", + "signature 1.6.4", +] + +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature 2.2.0", ] [[package]] @@ -1657,13 +1723,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" dependencies = [ "curve25519-dalek 3.2.0", - "ed25519", + "ed25519 1.5.3", "rand 0.7.3", "serde", "sha2 0.9.9", "zeroize", ] +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek 4.1.3", + "ed25519 2.2.3", + "sha2 0.10.9", + "signature 2.2.0", + "subtle", +] + [[package]] name = "ed25519-dalek-bip32" version = "0.2.0" @@ -1671,7 +1750,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d2be62a4061b872c8c0873ee4fc6f101ce7b889d039f019c5fa2af471a59908" dependencies = [ "derivation-path", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "hmac 0.12.1", "sha2 0.10.9", ] @@ -2115,6 +2194,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "fslock" version = "0.2.1" @@ -2965,6 +3053,26 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "inotify" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" +dependencies = [ + "bitflags 2.10.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.4" @@ -3117,6 +3225,26 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57d8d8ce877200136358e0bbff3a77965875db3af755a11e1fa6b1b3e2df13ea" +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -3413,6 +3541,26 @@ dependencies = [ "libc", ] +[[package]] +name = "machineid-rs" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ceb4d434d69d7199abc3036541ba6ef86767a4356e3077d5a3419f85b70b14" +dependencies = [ + "hex", + "hmac 0.12.1", + "md-5", + "serde", + "serde_json", + "sha-1", + "sha2 0.10.9", + "sysinfo", + "uuid", + "whoami", + "winreg 0.11.0", + "wmi", +] + [[package]] name = "magic-domain-program" version = "0.2.0" @@ -3568,6 +3716,7 @@ dependencies = [ "magicblock-metrics", "magicblock-processor", "magicblock-program", + "magicblock-replicator", "magicblock-services", "magicblock-task-scheduler", "magicblock-validator-admin", @@ -3738,6 +3887,7 @@ name = "magicblock-core" version = "0.8.5" dependencies = [ "bincode", + "bytes", "flume", "magicblock-magic-program-api 0.8.5", "serde", @@ -3972,6 +4122,30 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "magicblock-replicator" +version = "0.8.5" +dependencies = [ + "async-nats", + "bincode", + "bytes", + "futures", + "machineid-rs", + "magicblock-accounts-db", + "magicblock-core", + "magicblock-ledger", + "notify", + "serde", + "solana-hash 2.2.1", + "solana-transaction", + "solana-transaction-error", + "thiserror 1.0.69", + "tokio", + "tokio-util 0.7.17", + "tracing", + "url", +] + [[package]] name = "magicblock-rpc-client" version = "0.8.5" @@ -4019,7 +4193,7 @@ dependencies = [ name = "magicblock-table-mania" version = "0.8.5" dependencies = [ - "ed25519-dalek", + "ed25519-dalek 1.0.1", "magicblock-metrics", "magicblock-rpc-client", "rand 0.9.2", @@ -4113,6 +4287,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "memchr" version = "2.7.6" @@ -4203,6 +4387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", + "log", "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.61.2", ] @@ -4332,6 +4517,21 @@ dependencies = [ "libc", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519 2.2.3", + "ed25519-dalek 2.2.0", + "getrandom 0.2.16", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "no-std-compat" version = "0.4.1" @@ -4360,6 +4560,42 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.10.0", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags 2.10.0", +] + +[[package]] +name = "ntapi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -4369,6 +4605,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "num" version = "0.2.1" @@ -4716,6 +4961,15 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -4833,6 +5087,16 @@ dependencies = [ "solana-address", ] +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -5575,7 +5839,7 @@ dependencies = [ "cfg-if", "libc", "rustix 1.1.2", - "windows", + "windows 0.62.2", ] [[package]] @@ -5666,7 +5930,7 @@ dependencies = [ "wasm-bindgen-futures", "web-sys", "webpki-roots 0.25.4", - "winreg", + "winreg 0.50.0", ] [[package]] @@ -5874,6 +6138,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.2" @@ -5926,7 +6203,7 @@ dependencies = [ "log", "once_cell", "rustls 0.23.35", - "rustls-native-certs", + "rustls-native-certs 0.8.2", "rustls-platform-verifier-android", "rustls-webpki 0.103.8", "security-framework 3.5.1", @@ -5951,6 +6228,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.8" @@ -6248,6 +6535,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -6326,6 +6633,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha1" version = "0.10.6" @@ -6401,12 +6719,34 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature 2.2.0", + "zeroize", +] + [[package]] name = "signature" version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest 0.10.7", + "rand_core 0.6.4", +] + [[package]] name = "simd-adler32" version = "0.3.8" @@ -7220,7 +7560,7 @@ checksum = "a1feafa1691ea3ae588f99056f4bdd1293212c7ece28243d7da257c443e84753" dependencies = [ "bytemuck", "bytemuck_derive", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "solana-feature-set", "solana-instruction 2.2.1", "solana-precompile-error", @@ -7548,7 +7888,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dbb7042c2e0c561afa07242b2099d55c57bd1b1da3b6476932197d84e15e3e4" dependencies = [ "bs58", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "ed25519-dalek-bip32", "rand 0.7.3", "solana-derivation-path", @@ -8803,7 +9143,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47d251c8f3dc015f320b4161daac7f108156c837428e5a8cc61136d25beb11d6" dependencies = [ "bs58", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "rand 0.8.5", "serde", "serde-big-array", @@ -9728,6 +10068,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "spl-associated-token-account" version = "6.0.0" @@ -10226,6 +10576,21 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "sysinfo" +version = "0.29.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd727fc423c2060f6c92d9534cef765c65a6ed3f428a03d7def74a8c4348e666" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -10428,6 +10793,7 @@ dependencies = [ "solana-signature", "solana-signer", "solana-transaction", + "solana-transaction-error", "solana-transaction-status-client-types", "tempfile", "tokio", @@ -10788,6 +11154,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-sink", + "http 1.4.0", + "httparse", + "rand 0.8.5", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.4", + "tokio-util 0.7.17", + "webpki-roots 0.26.11", +] + [[package]] name = "toml" version = "0.5.11" @@ -10890,7 +11277,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.13.5", - "rustls-native-certs", + "rustls-native-certs 0.8.2", "rustls-pemfile 2.2.0", "socket2 0.5.10", "tokio", @@ -11084,6 +11471,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tungstenite" version = "0.20.1" @@ -11343,6 +11740,12 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.106" @@ -11475,6 +11878,17 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" @@ -11531,6 +11945,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-implement 0.48.0", + "windows-interface 0.48.0", + "windows-targets 0.48.5", +] + [[package]] name = "windows" version = "0.62.2" @@ -11558,8 +11983,8 @@ version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ - "windows-implement", - "windows-interface", + "windows-implement 0.60.2", + "windows-interface 0.59.3", "windows-link", "windows-result", "windows-strings", @@ -11576,6 +12001,17 @@ dependencies = [ "windows-threading", ] +[[package]] +name = "windows-implement" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e2ee588991b9e7e6c8338edf3333fbe4da35dc72092643958ebb43f0ab2c49c" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -11587,6 +12023,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "windows-interface" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6fb8df20c9bcaa8ad6ab513f7b40104840c8867d5751126e4df3b08388d0cc7" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "windows-interface" version = "0.59.3" @@ -11947,6 +12394,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a1a57ff50e9b408431e8f97d5456f2807f8eb2a2cd79b06068fc87f8ecf189" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "winreg" version = "0.50.0" @@ -11963,6 +12420,20 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "wmi" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daffb44abb7d2e87a1233aa17fdbde0d55b890b32a23a1f908895b87fa6f1a00" +dependencies = [ + "chrono", + "futures", + "log", + "serde", + "thiserror 1.0.69", + "windows 0.48.0", +] + [[package]] name = "writeable" version = "0.6.2" diff --git a/test-kit/Cargo.toml b/test-kit/Cargo.toml index 925e3456a..fa00e9359 100644 --- a/test-kit/Cargo.toml +++ b/test-kit/Cargo.toml @@ -22,6 +22,7 @@ solana-rpc-client = { workspace = true } solana-signature = { workspace = true } solana-signer = { workspace = true } solana-transaction = { workspace = true } +solana-transaction-error = { workspace = true } solana-transaction-status-client-types = { workspace = true } tempfile = { workspace = true } diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index eaedfa801..efa899771 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -15,8 +15,7 @@ use magicblock_core::{ link, transactions::{ ReplayPosition, SanitizeableTransaction, SchedulerMode, - TransactionResult, TransactionSchedulerHandle, - TransactionSimulationResult, + TransactionSchedulerHandle, TransactionSimulationResult, }, DispatchEndpoints, }, @@ -37,6 +36,7 @@ use solana_program::{ use solana_signature::Signature; pub use solana_signer::Signer; use solana_transaction::Transaction; +use solana_transaction_error::TransactionResult; use solana_transaction_status_client_types::TransactionStatusMeta; use tempfile::TempDir; use tokio::sync::mpsc::Sender; @@ -167,10 +167,12 @@ impl ExecutionTestEnv { transaction_status_tx: validator_channels.transaction_status, txn_to_process_rx: validator_channels.transaction_to_process, tasks_tx: validator_channels.tasks_service, + replication_tx: validator_channels.replication_messages, environment, is_auto_airdrop_lamports_enabled: false, shutdown: Default::default(), mode_rx, + pause_permit: validator_channels.pause_permit, }; // Pre-send the target mode so the scheduler picks it up once running. @@ -330,7 +332,7 @@ impl ExecutionTestEnv { pub async fn execute_transaction( &self, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { self.transaction_scheduler.execute(txn).await.inspect_err( |err| error!(error = ?err, "Transaction execution failed"), ) @@ -372,7 +374,7 @@ impl ExecutionTestEnv { &self, persist: bool, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let position = ReplayPosition { slot: 0, index: 0,