diff --git a/magicblock-ledger/src/lib.rs b/magicblock-ledger/src/lib.rs index c9a2776dc..41e584baf 100644 --- a/magicblock-ledger/src/lib.rs +++ b/magicblock-ledger/src/lib.rs @@ -10,7 +10,7 @@ use solana_hash::Hash; pub use store::api::{Ledger, SignatureInfosForAddress}; use tokio::sync::broadcast; -#[derive(Default)] +#[derive(Default, Clone)] pub struct LatestBlockInner { pub slot: u64, pub blockhash: Hash, diff --git a/magicblock-magic-program-api/Cargo.toml b/magicblock-magic-program-api/Cargo.toml index 3e580380e..1df57c231 100644 --- a/magicblock-magic-program-api/Cargo.toml +++ b/magicblock-magic-program-api/Cargo.toml @@ -10,6 +10,6 @@ edition.workspace = true [dependencies] solana-program = ">=1.16, <3.0.0" -solana-signature = { workspace = true, default-features = false, features = ["serde"] } +solana-signature = { workspace = true, features = ["serde"] } bincode = "^1.3.3" serde = { version = "^1.0.228", features = ["derive"] } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index e9eb98e6b..2a0f3f60b 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -1,10 +1,11 @@ use std::{ cmp::Ordering, + collections::BTreeMap, ops::Deref, sync::{Arc, RwLock}, }; -use magicblock_accounts_db::{AccountsDb, GlobalSyncLock}; +use magicblock_accounts_db::AccountsDb; use magicblock_core::{ link::{ accounts::AccountUpdateTx, @@ -30,13 +31,15 @@ use tokio::{ runtime::Builder, sync::mpsc::{Receiver, Sender}, }; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; use crate::{ builtins::BUILTINS, scheduler::{locks::ExecutorId, state::TransactionSchedulerState}, }; +const BLOCK_HISTORY_SIZE: usize = 32; + pub(crate) struct IndexedTransaction { pub(crate) slot: Slot, pub(crate) index: u32, @@ -58,7 +61,7 @@ pub(super) struct TransactionExecutor { accountsdb: Arc, ledger: Arc, block: LatestBlock, - sync: GlobalSyncLock, + block_history: BTreeMap, // SVM Components processor: TransactionBatchProcessor, @@ -103,14 +106,19 @@ impl TransactionExecutor { ..Default::default() }); + let block = state.ledger.latest_block(); + let initial_block = LatestBlockInner::clone(&*block.load()); + + let mut block_history = BTreeMap::new(); + block_history.insert(initial_block.slot, initial_block.clone()); + let this = Self { id, - sync: state.accountsdb.write_lock(), processor, accountsdb: state.accountsdb.clone(), ledger: state.ledger.clone(), config, - block: state.ledger.latest_block().clone(), + block: block.clone(), environment: state.environment.clone(), rx, ready_tx, @@ -119,6 +127,7 @@ impl TransactionExecutor { tasks_tx: state.tasks_tx.clone(), is_auto_airdrop_lamports_enabled: state .is_auto_airdrop_lamports_enabled, + block_history, }; this.processor.fill_missing_sysvar_cache_entries(&this); @@ -155,7 +164,8 @@ impl TransactionExecutor { #[allow(clippy::await_holding_lock)] #[instrument(skip(self), fields(executor_id = self.id))] async fn run(mut self) { - let mut guard = self.sync.read(); + let sync = self.accountsdb.write_lock(); + let mut guard = sync.read(); let mut block_updated = self.block.subscribe(); loop { @@ -163,6 +173,9 @@ impl TransactionExecutor { biased; txn = self.rx.recv() => { let Some(transaction) = txn else { break }; + if transaction.slot != self.processor.slot { + self.transition_to_slot(transaction.slot); + } match transaction.txn.mode { TransactionProcessingMode::Execution(_) => { self.execute(transaction, None); @@ -177,10 +190,11 @@ impl TransactionExecutor { let _ = self.ready_tx.try_send(self.id); } _ = block_updated.recv() => { - // Unlock to allow global ops (snapshots), then update slot + // Unlock to allow global ops (snapshots), then + // register the new block for future transactions RwLockReadGuard::unlock_fair(guard); - self.transition_to_new_slot(); - guard = self.sync.read(); + self.register_new_block(); + guard = sync.read(); } else => break, } @@ -188,11 +202,23 @@ impl TransactionExecutor { info!("Executor terminated"); } - fn transition_to_new_slot(&mut self) { - let block = self.block.load(); + fn register_new_block(&mut self) { + let block = LatestBlockInner::clone(&*self.block.load()); + while self.block_history.len() >= BLOCK_HISTORY_SIZE { + self.block_history.pop_first(); + } + self.block_history.insert(block.slot, block); + } + + fn transition_to_slot(&mut self, slot: Slot) { + let Some(block) = self.block_history.get(&slot) else { + // should never happen in practice + warn!(slot, "tried to transition to slot which wasn't registered"); + return; + }; self.environment.blockhash = block.blockhash; self.processor.slot = block.slot; - self.set_sysvars(&block); + self.set_sysvars(block); } /// Updates cache and persists slot hashes.