diff --git a/.gitignore b/.gitignore index 6c2b3a026..466de6e37 100644 --- a/.gitignore +++ b/.gitignore @@ -15,11 +15,12 @@ target/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ +.vscode # Ledger test-ledger/ test-ledger-magicblock/ -magicblock-test-storage/ +magicblock-test-storage* # Mac **/.DS_Store @@ -29,9 +30,11 @@ magicblock-test-storage/ .github/packages/npm-package/node_modules .github/packages/npm-package/package-lock.json +# Configs +config.json +config.toml + # AI related **/CLAUDE.md CODEBASE_MAP.md -config.json -config.toml AGENTS.md diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index b8c2ea980..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "workbench.colorCustomizations": { - "activityBar.background": "#442427", - "titleBar.activeBackground": "#5F3337", - "titleBar.activeForeground": "#FCF9F9" - } -} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index f5c937639..59ff2ee30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3801,6 +3801,7 @@ dependencies = [ "futures", "machineid-rs", "magicblock-accounts-db", + "magicblock-chainlink", "magicblock-core", "magicblock-ledger", "notify", diff --git a/Cargo.toml b/Cargo.toml index 3432e1e5c..00195b3c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,7 +100,9 @@ magicblock-accounts = { path = "./magicblock-accounts" } magicblock-accounts-db = { path = "./magicblock-accounts-db" } magicblock-aperture = { path = "./magicblock-aperture" } magicblock-api = { path = "./magicblock-api" } -magicblock-chainlink = { path = "./magicblock-chainlink" } +magicblock-chainlink = { path = "./magicblock-chainlink", features = [ + "dev-context", +] } magicblock-committor-program = { path = "./magicblock-committor-program", features = [ "no-entrypoint", ] } diff --git a/magicblock-accounts-db/src/lib.rs b/magicblock-accounts-db/src/lib.rs index 20041bd82..c748f1c06 100644 --- a/magicblock-accounts-db/src/lib.rs +++ b/magicblock-accounts-db/src/lib.rs @@ -408,7 +408,7 @@ impl AccountsDb { } pub fn database_directory(&self) -> &Path { - self.snapshot_manager.database_path() + &self.snapshot_manager.snapshots_dir } } @@ -434,7 +434,7 @@ impl AccountsBank for AccountsDb { /// Removes accounts matching a predicate. fn remove_where( &self, - predicate: impl Fn(&Pubkey, &AccountSharedData) -> bool, + mut predicate: impl FnMut(&Pubkey, &AccountSharedData) -> bool, ) -> AccountsDbResult { let to_remove = self .iter_all() diff --git a/magicblock-accounts-db/src/snapshot.rs b/magicblock-accounts-db/src/snapshot.rs index 655548e44..093a70731 100644 --- a/magicblock-accounts-db/src/snapshot.rs +++ b/magicblock-accounts-db/src/snapshot.rs @@ -66,7 +66,7 @@ impl SnapshotStrategy { #[derive(Debug)] pub struct SnapshotManager { db_path: PathBuf, - snapshots_dir: PathBuf, + pub(crate) snapshots_dir: PathBuf, strategy: SnapshotStrategy, /// Ordered registry of archive paths (oldest to newest). registry: Mutex>, @@ -129,7 +129,9 @@ impl SnapshotManager { let archive_path = snapshot_dir.with_extension(ARCHIVE_EXT); let tmp_path = archive_path.with_extension("tmp"); - info!(archive_path = %archive_path.display(), "Archiving snapshot"); + if let Some(archive) = archive_path.file_name() { + info!(?archive, "Archiving snapshot"); + } // Write to temporary file first let file = File::create(&tmp_path).log_err(|| { @@ -146,6 +148,7 @@ impl SnapshotManager { let file = enc.finish().log_err(|| "Failed to finish gzip archive")?; file.sync_all() .log_err(|| "Failed to sync archive to disk")?; + drop(file); // Atomically rename to final path fs::rename(&tmp_path, &archive_path).log_err(|| { @@ -178,7 +181,9 @@ impl SnapshotManager { current_slot: u64, ) -> AccountsDbResult { // Validate archive structure - Self::validate_archive(archive_bytes)?; + Self::validate_archive(archive_bytes).log_err(|| { + format!("snapshot archive bytes are corrupted, slot: {slot}") + })?; let archive_path = self.slot_to_archive_path(slot); if archive_path.exists() { @@ -331,7 +336,6 @@ impl SnapshotManager { /// Registers an archive in the registry. fn register_archive(&self, archive_path: PathBuf) { - info!(archive_path = %archive_path.display(), "Snapshot registered"); self.registry.lock().push_back(archive_path); } diff --git a/magicblock-accounts-db/src/traits.rs b/magicblock-accounts-db/src/traits.rs index 7cb92b6e8..6531fb925 100644 --- a/magicblock-accounts-db/src/traits.rs +++ b/magicblock-accounts-db/src/traits.rs @@ -8,7 +8,7 @@ pub trait AccountsBank: Send + Sync + 'static { fn remove_account(&self, pubkey: &Pubkey); fn remove_where( &self, - predicate: impl Fn(&Pubkey, &AccountSharedData) -> bool, + predicate: impl FnMut(&Pubkey, &AccountSharedData) -> bool, ) -> AccountsDbResult; fn remove_account_conditionally( diff --git a/magicblock-aperture/src/lib.rs b/magicblock-aperture/src/lib.rs index acafdf15c..7d05fccbd 100644 --- a/magicblock-aperture/src/lib.rs +++ b/magicblock-aperture/src/lib.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use error::{ApertureError, RpcError}; use magicblock_config::config::aperture::ApertureConfig; use magicblock_core::link::DispatchEndpoints; @@ -17,10 +19,12 @@ pub async fn initialize_aperture( dispatch: &DispatchEndpoints, cancel: CancellationToken, ) -> ApertureResult { - // Start up an event processor tasks, which will handle forwarding of any validator - // originating event to client subscribers, or use them to update server's caches - EventProcessor::start(config, &state, dispatch, cancel.clone())?; - let server = JsonRpcServer::new(config, state, dispatch, cancel).await?; + let server = + JsonRpcServer::new(config, state.clone(), dispatch, cancel.clone()) + .await?; + // Start event processors only after the server has bound its sockets so a + // bind failure cannot leak background tasks during retries in tests/startup. + EventProcessor::start(config, &state, dispatch, cancel)?; Ok(server) } @@ -28,6 +32,8 @@ pub async fn initialize_aperture( pub struct JsonRpcServer { http: HttpServer, websocket: WebsocketServer, + http_addr: SocketAddr, + ws_addr: SocketAddr, } impl JsonRpcServer { @@ -39,10 +45,21 @@ impl JsonRpcServer { cancel: CancellationToken, ) -> ApertureResult { // try to bind to socket before spawning anything (handy in tests) - let mut addr = config.listen.0; - let http = TcpListener::bind(addr).await.map_err(RpcError::internal)?; - addr.set_port(addr.port() + 1); - let ws = TcpListener::bind(addr).await.map_err(RpcError::internal)?; + let http = TcpListener::bind(config.listen.0) + .await + .map_err(RpcError::internal)?; + let http_addr = http.local_addr().map_err(RpcError::internal)?; + + let mut ws_addr = http_addr; + if config.listen.0.port() == 0 { + ws_addr.set_port(0); + } else { + ws_addr.set_port(config.listen.0.port() + 1); + } + let ws = TcpListener::bind(ws_addr) + .await + .map_err(RpcError::internal)?; + let ws_addr = ws.local_addr().map_err(RpcError::internal)?; // initialize HTTP and Websocket servers let websocket = { @@ -50,7 +67,20 @@ impl JsonRpcServer { WebsocketServer::new(ws, &state, cancel).await? }; let http = HttpServer::new(http, state, cancel, dispatch).await?; - Ok(Self { http, websocket }) + Ok(Self { + http, + websocket, + http_addr, + ws_addr, + }) + } + + pub fn http_addr(&self) -> SocketAddr { + self.http_addr + } + + pub fn ws_addr(&self) -> SocketAddr { + self.ws_addr } /// Run JSON-RPC server indefinitely, until cancel token is used to signal shut down diff --git a/magicblock-aperture/src/requests/http/request_airdrop.rs b/magicblock-aperture/src/requests/http/request_airdrop.rs index 0618cf12e..715998362 100644 --- a/magicblock-aperture/src/requests/http/request_airdrop.rs +++ b/magicblock-aperture/src/requests/http/request_airdrop.rs @@ -1,5 +1,3 @@ -use magicblock_core::link::transactions::with_encoded; - use super::prelude::*; impl HttpDispatcher { @@ -10,39 +8,37 @@ impl HttpDispatcher { /// the faucet is not enabled on the node. pub(crate) async fn request_airdrop( &self, - request: &mut JsonRequest, + _request: &mut JsonRequest, ) -> HandlerResult { // Airdrops are only supported if a faucet keypair is configured. // Which is never the case with *ephemeral* running mode of the validator - let Some(ref faucet) = self.context.faucet else { - return Err(RpcError::invalid_request( - "free airdrop faucet is disabled", - )); - }; + Err(RpcError::invalid_request("free airdrop faucet is disabled")) + // TODO(bmuddha): allow free airdrops when other modes are fully reintroduced + // https://github.com/magicblock-labs/magicblock-validator/issues/1093 - let (pubkey, lamports) = - parse_params!(request.params()?, Serde32Bytes, u64); - let pubkey = some_or_err!(pubkey); - let lamports = some_or_err!(lamports); - if lamports == 0 { - return Err(RpcError::invalid_params("lamports must be > 0")); - } + // let (pubkey, lamports) = + // parse_params!(request.params()?, Serde32Bytes, u64); + // let pubkey = some_or_err!(pubkey); + // let lamports = some_or_err!(lamports); + // if lamports == 0 { + // return Err(RpcError::invalid_params("lamports must be > 0")); + // } - // Build and execute the airdrop transfer transaction. - let txn = solana_system_transaction::transfer( - faucet, - &pubkey, - lamports, - self.blocks.get_latest().hash, - ); - // we just signed the transaction, it must have a signature - let signature = - SerdeSignature(txn.signatures.first().cloned().unwrap_or_default()); + // // Build and execute the airdrop transfer transaction. + // let txn = solana_system_transaction::transfer( + // faucet, + // &pubkey, + // lamports, + // self.blocks.get_latest().hash, + // ); + // // we just signed the transaction, it must have a signature + // let signature = + // SerdeSignature(txn.signatures.first().cloned().unwrap_or_default()); - self.transactions_scheduler - .execute(with_encoded(txn)?) - .await?; + // self.transactions_scheduler + // .execute(with_encoded(txn)?) + // .await?; - Ok(ResponsePayload::encode_no_context(&request.id, signature)) + // Ok(ResponsePayload::encode_no_context(&request.id, signature)) } } diff --git a/magicblock-aperture/src/state/mod.rs b/magicblock-aperture/src/state/mod.rs index 8e1263562..e74e76ba9 100644 --- a/magicblock-aperture/src/state/mod.rs +++ b/magicblock-aperture/src/state/mod.rs @@ -14,7 +14,6 @@ use magicblock_chainlink::{ }; use magicblock_ledger::Ledger; use solana_feature_set::FeatureSet; -use solana_keypair::Keypair; use solana_pubkey::Pubkey; use subscriptions::SubscriptionsDb; use transactions::TransactionsCache; @@ -31,6 +30,7 @@ pub type ChainlinkImpl = Chainlink< /// This struct aggregates thread-safe handles (`Arc`) and concurrently accessible /// components (caches, databases) that need to be available across various parts /// of the application, such as RPC handlers and event processors. +#[derive(Clone)] pub struct SharedState { /// The public key of the validator node. pub(crate) context: NodeContext, @@ -51,12 +51,10 @@ pub struct SharedState { } /// Holds the core configuration and runtime parameters that define the node's operational context. -#[derive(Default)] +#[derive(Default, Clone)] pub struct NodeContext { /// The public key of the validator node. pub identity: Pubkey, - /// The keypair for the optional faucet, used to airdrop tokens. - pub faucet: Option, /// Base fee charged for transaction execution per signature. pub base_fee: u64, /// Runtime features activated for this node (used to compute fees) diff --git a/magicblock-aperture/src/tests.rs b/magicblock-aperture/src/tests.rs index 48bc7ef92..2e4cbcc82 100644 --- a/magicblock-aperture/src/tests.rs +++ b/magicblock-aperture/src/tests.rs @@ -42,14 +42,8 @@ fn ws_channel() -> (WsConnectionChannel, Receiver) { fn chainlink(accounts_db: &Arc) -> ChainlinkImpl { let cfg = ChainLinkConfig::default(); - ChainlinkImpl::try_new( - accounts_db, - None, - Pubkey::new_unique(), - Pubkey::new_unique(), - &cfg, - ) - .expect("Failed to create Chainlink") + ChainlinkImpl::try_new(accounts_db, None, Pubkey::new_unique(), &cfg) + .expect("Failed to create Chainlink") } mod event_processor { diff --git a/magicblock-aperture/tests/mocked.rs b/magicblock-aperture/tests/mocked.rs index f96cfdf57..e1e37bc3b 100644 --- a/magicblock-aperture/tests/mocked.rs +++ b/magicblock-aperture/tests/mocked.rs @@ -158,13 +158,13 @@ async fn test_get_epoch_info() { .expect("get_epoch_info request failed"); assert_eq!(epoch_info.epoch, 0, "epoch should be 0"); - // The absolute_slot should be at most 1 behind the current slot - // due to auto-advancement timing + // The absolute_slot should be at most 3 behind the current slot + // due to auto-advancement timing (50ms block time) let current_slot = env.latest_slot(); assert!( epoch_info.absolute_slot <= current_slot - && epoch_info.absolute_slot >= current_slot.saturating_sub(1), - "absolute_slot {} should be within 1 of current slot {}", + && epoch_info.absolute_slot >= current_slot.saturating_sub(3), + "absolute_slot {} should be within 3 of current slot {}", epoch_info.absolute_slot, current_slot ); diff --git a/magicblock-aperture/tests/setup.rs b/magicblock-aperture/tests/setup.rs index 39c91b9e2..a14158689 100644 --- a/magicblock-aperture/tests/setup.rs +++ b/magicblock-aperture/tests/setup.rs @@ -64,7 +64,6 @@ fn chainlink(accounts_db: &Arc) -> Arc { accounts_db, None, Pubkey::new_unique(), - Pubkey::new_unique(), &ChainLinkConfig::default(), ) .expect("Failed to create Chainlink"), @@ -90,51 +89,38 @@ impl RpcTestEnv { const BLOCK_TIME_MS: u64 = 50; let execution = ExecutionTestEnv::new(); - // Wait for the scheduler to be ready and in primary mode - execution.wait_for_scheduler_ready().await; + execution.advance_slot(); let faucet = Keypair::new(); execution.fund_account(faucet.pubkey(), Self::INIT_ACCOUNT_BALANCE); - // Try to find a free port, this is handy when using nextest - // where each test needs to run in a separate process. - let (server, port) = loop { - let port: u16 = rand::random_range(7000..u16::MAX - 1); - let node_context = NodeContext { - identity: execution.get_payer().pubkey, - faucet: Some(faucet.insecure_clone()), - base_fee: Self::BASE_FEE, - featureset: Default::default(), - blocktime: BLOCK_TIME_MS, - }; - let state = SharedState::new( - node_context, - execution.accountsdb.clone(), - execution.ledger.clone(), - chainlink(&execution.accountsdb), - ); - let cancel = CancellationToken::new(); - let listen = format!("127.0.0.1:{port}").parse().unwrap(); - let config = ApertureConfig { - listen, - ..Default::default() - }; - let server = initialize_aperture( - &config, - state, - &execution.dispatch, - cancel, - ) - .await; - if let Ok(server) = server { - break (server, port); - } + let node_context = NodeContext { + identity: execution.get_payer().pubkey, + base_fee: Self::BASE_FEE, + + featureset: Default::default(), + blocktime: BLOCK_TIME_MS, }; + let state = SharedState::new( + node_context, + execution.accountsdb.clone(), + execution.ledger.clone(), + chainlink(&execution.accountsdb), + ); + let cancel = CancellationToken::new(); + let config = ApertureConfig { + listen: "127.0.0.1:0".parse().unwrap(), + ..Default::default() + }; + let server = + initialize_aperture(&config, state, &execution.dispatch, cancel) + .await + .expect("failed to initialize aperture test server"); - tokio::spawn(server.run()); + let rpc_url = format!("http://{}", server.http_addr()); + let pubsub_url = format!("ws://{}", server.ws_addr()); - let rpc_url = format!("http://127.0.0.1:{port}"); - let pubsub_url = format!("ws://127.0.0.1:{}", port + 1); + tokio::spawn(server.run()); let rpc = RpcClient::new(rpc_url); let pubsub = PubsubClient::new(&pubsub_url) diff --git a/magicblock-aperture/tests/transactions.rs b/magicblock-aperture/tests/transactions.rs index d67b3a45b..cfab32622 100644 --- a/magicblock-aperture/tests/transactions.rs +++ b/magicblock-aperture/tests/transactions.rs @@ -298,24 +298,10 @@ async fn test_request_airdrop() { env.execution.fund_account(recipient, 1); // Start with 1 lamport let airdrop_amount = RpcTestEnv::INIT_ACCOUNT_BALANCE / 10; - let signature = env - .rpc - .request_airdrop(&recipient, airdrop_amount) - .await - .expect("request_airdrop failed"); + let result = env.rpc.request_airdrop(&recipient, airdrop_amount).await; - let meta = env - .execution - .get_transaction(signature) - .expect("airdrop transaction should have been persisted"); - assert!(meta.status.is_ok(), "airdrop transaction should succeed"); - - let account = env.execution.accountsdb.get_account(&recipient).unwrap(); - assert_eq!( - account.lamports(), - airdrop_amount + 1, - "airdrop was not credited correctly" - ); + // TODO: restore the behavior, once the airdrops are enabled again + assert!(result.is_err(), "airdrop transaction should have failed"); } /// Verifies that `get_fee_for_message` returns the correct fee based on the number of signatures. diff --git a/magicblock-api/src/fund_account.rs b/magicblock-api/src/fund_account.rs index 3e9e4e2f0..00d7ad14b 100644 --- a/magicblock-api/src/fund_account.rs +++ b/magicblock-api/src/fund_account.rs @@ -1,18 +1,9 @@ -use std::path::Path; - use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_magic_program_api as magic_program; use magicblock_program::MagicContext; use solana_account::{AccountSharedData, WritableAccount}; -use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_rent::Rent; -use solana_signer::Signer; - -use crate::{ - errors::ApiResult, - ledger::{read_faucet_keypair_from_ledger, write_faucet_keypair_to_ledger}, -}; pub(crate) fn fund_account( accountsdb: &AccountsDb, @@ -45,27 +36,6 @@ pub(crate) fn init_validator_identity( let _ = accountsdb.insert_account(validator_id, &authority); } -/// Funds the faucet account. -/// If the [create_new] is `false` then the faucet keypair will be read from the -/// existing ledger and an error is raised if it is not found. -/// Otherwise, a new faucet keypair will be created and saved to the ledger. -pub(crate) fn funded_faucet( - accountsdb: &AccountsDb, - ledger_path: &Path, -) -> ApiResult { - let faucet_keypair = match read_faucet_keypair_from_ledger(ledger_path) { - Ok(faucet_keypair) => faucet_keypair, - Err(_) => { - let faucet_keypair = Keypair::new(); - write_faucet_keypair_to_ledger(ledger_path, &faucet_keypair)?; - faucet_keypair - } - }; - - fund_account(accountsdb, &faucet_keypair.pubkey(), u64::MAX / 2); - Ok(faucet_keypair) -} - pub(crate) fn fund_magic_context(accountsdb: &AccountsDb) { const CONTEXT_LAMPORTS: u64 = u64::MAX; diff --git a/magicblock-api/src/genesis_utils.rs b/magicblock-api/src/genesis_utils.rs index c931df0fd..ffc278231 100644 --- a/magicblock-api/src/genesis_utils.rs +++ b/magicblock-api/src/genesis_utils.rs @@ -8,11 +8,9 @@ use solana_feature_gate_interface::{create_account, Feature}; use solana_feature_set::FeatureSet; use solana_fee_calculator::FeeRateGovernor; use solana_genesis_config::{ClusterType, GenesisConfig}; -use solana_keypair::Keypair; use solana_native_token::LAMPORTS_PER_SOL; use solana_pubkey::Pubkey; use solana_rent::Rent; -use solana_signer::Signer; // Default amount received by the validator const VALIDATOR_LAMPORTS: u64 = u64::MAX / 2; @@ -23,15 +21,10 @@ pub struct GenesisConfigInfo { } pub fn create_genesis_config_with_leader( - mint_lamports: u64, validator_pubkey: &Pubkey, lamports_per_signature: u64, ) -> GenesisConfigInfo { - let mint_keypair = Keypair::new(); - let genesis_config = create_genesis_config_with_leader_ex( - mint_lamports, - &mint_keypair.pubkey(), validator_pubkey, VALIDATOR_LAMPORTS, FeeRateGovernor { @@ -77,18 +70,12 @@ pub fn activate_feature( #[allow(clippy::too_many_arguments)] pub fn create_genesis_config_with_leader_ex( - mint_lamports: u64, - mint_pubkey: &Pubkey, validator_pubkey: &Pubkey, validator_lamports: u64, fee_rate_governor: FeeRateGovernor, rent: Rent, mut initial_accounts: Vec<(Pubkey, AccountSharedData)>, ) -> GenesisConfig { - initial_accounts.push(( - *mint_pubkey, - AccountSharedData::new(mint_lamports, 0, &Pubkey::default()), - )); initial_accounts.push(( *validator_pubkey, AccountSharedData::new(validator_lamports, 0, &Pubkey::default()), diff --git a/magicblock-api/src/ledger.rs b/magicblock-api/src/ledger.rs index c471c9c64..b6607b395 100644 --- a/magicblock-api/src/ledger.rs +++ b/magicblock-api/src/ledger.rs @@ -66,50 +66,6 @@ pub fn lock_ledger<'lock>( }) } -// ----------------- -// Faucet -// ----------------- -fn faucet_keypair_path(ledger_path: &Path) -> ApiResult { - let parent = ledger_parent_dir(ledger_path)?; - Ok(parent.join("faucet-keypair.json")) -} - -pub(crate) fn read_faucet_keypair_from_ledger( - ledger_path: &Path, -) -> ApiResult { - let keypair_path = faucet_keypair_path(ledger_path)?; - if fs::exists(keypair_path.as_path()).unwrap_or_default() { - let keypair = - Keypair::read_from_file(keypair_path.as_path()).map_err(|err| { - ApiError::LedgerInvalidFaucetKeypair( - keypair_path.display().to_string(), - err.to_string(), - ) - })?; - Ok(keypair) - } else { - Err(ApiError::LedgerIsMissingFaucetKeypair( - keypair_path.display().to_string(), - )) - } -} - -pub(crate) fn write_faucet_keypair_to_ledger( - ledger_path: &Path, - keypair: &Keypair, -) -> ApiResult<()> { - let keypair_path = faucet_keypair_path(ledger_path)?; - keypair - .write_to_file(keypair_path.as_path()) - .map_err(|err| { - ApiError::LedgerCouldNotWriteFaucetKeypair( - keypair_path.display().to_string(), - err.to_string(), - ) - })?; - Ok(()) -} - // ----------------- // Validator Keypair // ----------------- diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index a6945e11b..ca7d3ea1d 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -90,8 +90,7 @@ use crate::{ domain_registry_manager::DomainRegistryManager, errors::{ApiError, ApiResult}, fund_account::{ - fund_ephemeral_vault, fund_magic_context, funded_faucet, - init_validator_identity, + fund_ephemeral_vault, fund_magic_context, init_validator_identity, }, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, ledger::{ @@ -154,7 +153,6 @@ impl MagicValidator { genesis_config, validator_pubkey, } = create_genesis_config_with_leader( - u64::MAX, &validator_pubkey, config.validator.basefee, ); @@ -176,7 +174,6 @@ impl MagicValidator { log_timing("startup", "sync_validator_keypair", step_start); let latest_block = ledger.latest_block().load(); - let step_start = Instant::now(); let mut accountsdb = AccountsDb::new(&config.accountsdb, &config.storage, last_slot)?; @@ -224,6 +221,28 @@ impl MagicValidator { let accountsdb = Arc::new(accountsdb); let (mut dispatch, validator_channels) = link(); + let step_start = Instant::now(); + let committor_service = + Self::init_committor_service(&config, ledger.latest_block()) + .await?; + log_timing("startup", "committor_service_init", step_start); + init_magic_sys(Arc::new(MagicSysAdapter::new( + committor_service.clone(), + ))); + + let step_start = Instant::now(); + let chainlink = Arc::new( + Self::init_chainlink( + &config, + committor_service.clone(), + &dispatch.transaction_scheduler, + &ledger.latest_block().clone(), + &accountsdb, + ) + .await?, + ); + log_timing("startup", "chainlink_init", step_start); + let replication_service = if let Some((broker, is_fresh_start)) = broker { let messages_rx = dispatch.replication_messages.take().expect( @@ -239,6 +258,7 @@ impl MagicValidator { mode_tx.clone(), accountsdb.clone(), ledger.clone(), + chainlink.stub(), dispatch.transaction_scheduler.clone(), messages_rx, token.clone(), @@ -249,7 +269,6 @@ impl MagicValidator { } else { None }; - log_timing("startup", "accountsdb_init", step_start); for (pubkey, account) in genesis_config.accounts { if accountsdb.get_account(&pubkey).is_some() { continue; @@ -268,9 +287,6 @@ impl MagicValidator { fund_magic_context(&accountsdb); fund_ephemeral_vault(&accountsdb); - let faucet_keypair = - funded_faucet(&accountsdb, ledger.ledger_path().as_path())?; - let step_start = Instant::now(); let metrics_service = magicblock_metrics::try_start_metrics_service( config.metrics.address.0, @@ -288,40 +304,6 @@ impl MagicValidator { ); log_timing("startup", "system_metrics_ticker_start", step_start); - let step_start = Instant::now(); - info!("Starting committor service"); - let committor_service = - Self::init_committor_service(&config, ledger.latest_block()) - .await?; - info!( - duration_ms = step_start.elapsed().as_millis() as u64, - enabled = committor_service.is_some(), - "Committor service started" - ); - log_timing("startup", "committor_service_init", step_start); - init_magic_sys(Arc::new(MagicSysAdapter::new( - committor_service.clone(), - ))); - - let step_start = Instant::now(); - info!("Starting chainlink"); - let chainlink = Arc::new( - Self::init_chainlink( - &config, - committor_service.clone(), - &dispatch.transaction_scheduler, - &ledger.latest_block().clone(), - &accountsdb, - faucet_keypair.pubkey(), - ) - .await?, - ); - info!( - duration_ms = step_start.elapsed().as_millis() as u64, - "Chainlink started" - ); - log_timing("startup", "chainlink_init", step_start); - let scheduled_commits_processor = committor_service.as_ref().map(|committor_service| { Arc::new(ScheduledCommitsProcessorImpl::new( @@ -371,11 +353,8 @@ impl MagicValidator { // Faucet keypair is only used for airdrops, which are not allowed in // the Ephemeral mode by setting the faucet to None in node context // (used by the RPC implementation), we effectively disable airdrops - let faucet = (config.lifecycle != LifecycleMode::Ephemeral) - .then_some(faucet_keypair); let node_context = NodeContext { identity: validator_pubkey, - faucet, base_fee, featureset: txn_scheduler_state.environment.feature_set.clone(), blocktime: config.ledger.block_time_ms(), @@ -530,7 +509,6 @@ impl MagicValidator { transaction_scheduler: &TransactionSchedulerHandle, latest_block: &LatestBlock, accountsdb: &Arc, - faucet_pubkey: Pubkey, ) -> ApiResult { let endpoints = Endpoints::try_from(config.remotes.as_slice()) .map_err(|e| { @@ -578,7 +556,6 @@ impl MagicValidator { &accounts_bank, &cloner, config.validator.keypair.insecure_clone(), - faucet_pubkey, chainlink_config, &config.chainlink, ) @@ -632,7 +609,6 @@ impl MagicValidator { } let accountsdb_slot = self.accountsdb.slot(); let ledger_slot = self.ledger.latest_block().load().slot; - // If we have accountsdb state, which is at least as new as the last state state // transition in the ledger then there's no need to run any kind of ledger replay if accountsdb_slot >= ledger_slot { diff --git a/magicblock-chainlink/src/accounts_bank.rs b/magicblock-chainlink/src/accounts_bank.rs index 2403129a1..f8d8e0b28 100644 --- a/magicblock-chainlink/src/accounts_bank.rs +++ b/magicblock-chainlink/src/accounts_bank.rs @@ -84,8 +84,7 @@ pub mod mock { pub fn dump_account_keys(&self, include_blacklisted: bool) -> String { let mut output = String::new(); output.push_str("AccountsBank {\n"); - let blacklisted_accounts = - blacklisted_accounts(&Pubkey::default(), &Pubkey::default()); + let blacklisted_accounts = blacklisted_accounts(&Pubkey::default()); for pubkey in self.accounts.lock().unwrap().keys() { if !include_blacklisted && blacklisted_accounts.contains(pubkey) { @@ -111,7 +110,7 @@ pub mod mock { } fn remove_where( &self, - predicate: impl Fn(&Pubkey, &AccountSharedData) -> bool, + mut predicate: impl FnMut(&Pubkey, &AccountSharedData) -> bool, ) -> AccountsDbResult { let mut accounts = self.accounts.lock().unwrap(); let initial_len = accounts.len(); diff --git a/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs b/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs index 963debe48..3e429fa0d 100644 --- a/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs +++ b/magicblock-chainlink/src/chainlink/blacklisted_accounts.rs @@ -5,15 +5,12 @@ use solana_pubkey::Pubkey; use solana_sdk_ids::{ address_lookup_table, bpf_loader, bpf_loader_deprecated, bpf_loader_upgradeable, compute_budget, config, ed25519_program, - incinerator, loader_v4, native_loader, secp256k1_program, stake, - system_program, vote, + incinerator, loader_v4, native_loader, secp256k1_program, + secp256r1_program, stake, system_program, vote, }; use solana_sysvar; -pub fn blacklisted_accounts( - validator_id: &Pubkey, - faucet_id: &Pubkey, -) -> HashSet { +pub fn blacklisted_accounts(validator_id: &Pubkey) -> HashSet { // This is buried in the accounts_db::native_mint module and we don't // want to take a dependency on that crate just for this ID which won't change const NATIVE_SOL_ID: Pubkey = @@ -31,7 +28,6 @@ pub fn blacklisted_accounts( blacklisted_accounts.insert(magic_program::MAGIC_CONTEXT_PUBKEY); blacklisted_accounts.insert(magic_program::EPHEMERAL_VAULT_PUBKEY); blacklisted_accounts.insert(*validator_id); - blacklisted_accounts.insert(*faucet_id); blacklisted_accounts } @@ -66,6 +62,7 @@ pub fn native_program_accounts() -> HashSet { blacklisted_programs.insert(loader_v4::ID); blacklisted_programs.insert(native_loader::ID); blacklisted_programs.insert(secp256k1_program::ID); + blacklisted_programs.insert(secp256r1_program::ID); blacklisted_programs.insert(stake::ID); blacklisted_programs.insert(system_program::ID); blacklisted_programs.insert(vote::ID); diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index b68f5e276..99049818a 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -132,13 +132,11 @@ where accounts_bank: &Arc, cloner: &Arc, validator_keypair: Keypair, - faucet_pubkey: Pubkey, subscription_updates_rx: mpsc::Receiver, allowed_programs: Option>, ) -> Arc { let validator_pubkey = validator_keypair.pubkey(); - let blacklisted_accounts = - blacklisted_accounts(&validator_pubkey, &faucet_pubkey); + let blacklisted_accounts = blacklisted_accounts(&validator_pubkey); let allowed_programs = allowed_programs.map(|programs| { programs.iter().map(|p| p.id).collect::>() }); diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs index 3a1c00164..1dfddf84b 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs @@ -124,8 +124,6 @@ where { init_logger(); - let faucet_pubkey = Pubkey::new_unique(); - // Setup mock RPC client with the accounts and clock sysvar let accounts_map: HashMap = accounts.into_iter().collect(); let rpc_client = ChainRpcClientMockBuilder::new() @@ -161,7 +159,6 @@ where remote_account_provider.clone(), &accounts_bank, validator_keypair, - faucet_pubkey, ); FetcherTestCtx { @@ -239,7 +236,6 @@ fn init_fetch_cloner( >, bank: &Arc, validator_keypair: Keypair, - faucet_pubkey: Pubkey, ) -> TestFetchClonerResult { let (subscription_tx, subscription_rx) = mpsc::channel(100); let cloner = Arc::new(ClonerStub::new(bank.clone())); @@ -248,7 +244,6 @@ fn init_fetch_cloner( bank, &cloner, validator_keypair, - faucet_pubkey, subscription_rx, None, ); @@ -1764,7 +1759,6 @@ async fn test_allowed_programs_filters_programs() { &accounts_bank, &cloner, validator_keypair.insecure_clone(), - random_pubkey(), subscription_rx, allowed_programs, ); @@ -1837,7 +1831,6 @@ async fn test_allowed_programs_none_allows_all() { &accounts_bank, &cloner, validator_keypair.insecure_clone(), - random_pubkey(), subscription_rx, None, // No restriction ); @@ -1909,7 +1902,6 @@ async fn test_allowed_programs_empty_allows_all() { &accounts_bank, &cloner, validator_keypair.insecure_clone(), - random_pubkey(), subscription_rx, allowed_programs, ); diff --git a/magicblock-chainlink/src/chainlink/mod.rs b/magicblock-chainlink/src/chainlink/mod.rs index 67063c3f3..6900e1a68 100644 --- a/magicblock-chainlink/src/chainlink/mod.rs +++ b/magicblock-chainlink/src/chainlink/mod.rs @@ -1,10 +1,4 @@ -use std::{ - collections::HashSet, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, -}; +use std::{collections::HashSet, sync::Arc}; use dlp_api::pda::ephemeral_balance_pda_from_payer; use errors::ChainlinkResult; @@ -14,9 +8,9 @@ use magicblock_config::config::ChainLinkConfig; use magicblock_metrics::metrics::AccountFetchOrigin; use solana_account::{AccountSharedData, ReadableAccount}; use solana_commitment_config::CommitmentConfig; -use solana_feature_set; use solana_keypair::Keypair; use solana_pubkey::Pubkey; +use solana_sdk_ids::feature; use solana_signer::Signer; use solana_transaction::sanitized::SanitizedTransaction; use tokio::{sync::mpsc, task}; @@ -28,10 +22,12 @@ use crate::{ fetch_cloner::FetchAndCloneResult, filters::is_noop_system_transfer, remote_account_provider::{ + chain_pubsub_client::mock::ChainPubsubClientMock, chain_updates_client::ChainUpdatesClient, ChainPubsubClient, ChainRpcClient, ChainRpcClientImpl, Endpoints, RemoteAccountProvider, }, submux::SubMuxClient, + testing::{cloner_stub::ClonerStub, rpc_client_mock::ChainRpcClientMock}, }; mod account_still_undelegating_on_chain; @@ -42,6 +38,10 @@ pub mod fetch_cloner; pub use blacklisted_accounts::*; +/// A type alias for chainlink with only accountsdb being real impl +pub type StubbedChainlink = + Chainlink; + // ----------------- // Chainlink // ----------------- @@ -61,7 +61,6 @@ pub struct Chainlink< removed_accounts_sub: Option>, validator_id: Pubkey, - faucet_id: Pubkey, /// If > 0, automatically airdrop this many lamports to feepayers when they are new/empty auto_airdrop_lamports: u64, @@ -77,7 +76,6 @@ impl accounts_bank: &Arc, fetch_cloner: Option>>, validator_pubkey: Pubkey, - faucet_pubkey: Pubkey, config: &ChainLinkConfig, ) -> ChainlinkResult { let removed_accounts_sub = if let Some(fetch_cloner) = &fetch_cloner { @@ -95,7 +93,6 @@ impl fetch_cloner, removed_accounts_sub, validator_id: validator_pubkey, - faucet_id: faucet_pubkey, auto_airdrop_lamports: config.auto_airdrop_lamports, remove_confined_accounts: config.remove_confined_accounts, }) @@ -115,7 +112,6 @@ impl accounts_bank: &Arc, cloner: &Arc, validator_keypair: Keypair, - faucet_pubkey: Pubkey, config: ChainlinkConfig, chainlink_config: &ChainLinkConfig, ) -> ChainlinkResult< @@ -139,7 +135,6 @@ impl accounts_bank, cloner, validator_keypair, - faucet_pubkey, rx, chainlink_config.allowed_programs.clone(), ); @@ -152,7 +147,6 @@ impl accounts_bank, fetch_cloner, validator_pubkey, - faucet_pubkey, chainlink_config, ) } @@ -162,18 +156,17 @@ impl /// when resuming an existing ledger to guarantee that we don't hold /// accounts that might be stale. pub fn reset_accounts_bank(&self) -> AccountsDbResult<()> { - let blacklisted_accounts = - blacklisted_accounts(&self.validator_id, &self.faucet_id); + let blacklisted_accounts = blacklisted_accounts(&self.validator_id); - let delegated_only = AtomicU64::new(0); - let undelegating = AtomicU64::new(0); - let blacklisted = AtomicU64::new(0); - let remaining = AtomicU64::new(0); - let remaining_empty = AtomicU64::new(0); + let mut delegated_only = 0; + let mut kept_ephemeral = 0; + let mut undelegating = 0; + let mut blacklisted = 0; + let mut remaining = 0u32; let removed = self.accounts_bank.remove_where(|pubkey, account| { if blacklisted_accounts.contains(pubkey) { - blacklisted.fetch_add(1, Ordering::Relaxed); + blacklisted += 1; return false; } if self.remove_confined_accounts && account.confined() { @@ -182,48 +175,37 @@ impl // Undelegating accounts are normally also delegated, but if that ever changes // we want to make sure we never remove an account of which we aren't sure // if the undelegation completed on chain or not. - if account.delegated() || account.undelegating() { - if account.undelegating() { - undelegating.fetch_add(1, Ordering::Relaxed); - } else { - delegated_only.fetch_add(1, Ordering::Relaxed); - } - return false; - } - if tracing::enabled!(tracing::Level::TRACE) { - let account_fmt = format!("{:#?}", account); + let should_remove = if account.undelegating() { + undelegating += 1; + false + } else if account.ephemeral() { + kept_ephemeral += 1; + false + } else if account.delegated() { + delegated_only += 1; + false + } else { + *account.owner() != feature::ID + }; + if should_remove { trace!( pubkey = %pubkey, - account = %account_fmt, + account=%format!("{account:#?}"), "Removing non-delegated, non-DLP-owned account" ); + } else { + remaining += 1; } - remaining.fetch_add(1, Ordering::Relaxed); - if account.owner().as_ref() != solana_feature_set::ID.as_ref() { - remaining_empty.fetch_add(1, Ordering::Relaxed); - } - true + should_remove })?; - let non_empty = remaining - .load(Ordering::Relaxed) - .saturating_sub(remaining_empty.load(Ordering::Relaxed)); - - let delegated_only = delegated_only.into_inner(); - let undelegating = undelegating.into_inner(); - let remaining_empty_count = remaining_empty.into_inner(); - let kept_delegated = delegated_only; - let kept_blacklisted = blacklisted.into_inner(); - let total_removed = removed; - info!( - total_removed, - non_empty, - empty = remaining_empty_count, + total_removed = removed, delegated_not_undelegating = delegated_only, delegated_and_undelegating = undelegating, - kept_delegated, - kept_blacklisted, + kept_delegated = delegated_only, + kept_blacklisted = blacklisted, + kept_ephemeral, "Removed accounts from bank" ); Ok(()) @@ -518,6 +500,22 @@ impl .map(|provider| provider.is_watching(pubkey)) .unwrap_or(false) } + + /// A temporary hacky method to clone chainlink with accountsdb only, + /// for it's used by the replication service to clean up accountsdb + /// + /// TODO(bmuddha): + /// remove all accountsdb management from chainlink, after accountsdb refactoring + pub fn stub(&self) -> StubbedChainlink { + Chainlink { + accounts_bank: self.accounts_bank.clone(), + fetch_cloner: None, + removed_accounts_sub: None, + validator_id: self.validator_id, + auto_airdrop_lamports: 0, + remove_confined_accounts: self.remove_confined_accounts, + } + } } // ----------------- diff --git a/magicblock-chainlink/tests/utils/test_context.rs b/magicblock-chainlink/tests/utils/test_context.rs index 6d7746016..e2c7f7b01 100644 --- a/magicblock-chainlink/tests/utils/test_context.rs +++ b/magicblock-chainlink/tests/utils/test_context.rs @@ -99,7 +99,6 @@ impl TestContext { &bank, &cloner, validator_keypair.insecure_clone(), - faucet_pubkey, rx, None, )), @@ -116,7 +115,6 @@ impl TestContext { &bank, fetch_cloner, validator_pubkey, - faucet_pubkey, &ChainLinkConfig::default(), ) .unwrap(); diff --git a/magicblock-ledger/src/blockstore_processor/mod.rs b/magicblock-ledger/src/blockstore_processor/mod.rs index 37c4b9ebd..9398854d8 100644 --- a/magicblock-ledger/src/blockstore_processor/mod.rs +++ b/magicblock-ledger/src/blockstore_processor/mod.rs @@ -12,7 +12,7 @@ use tracing::{Level, *}; use crate::{ errors::{LedgerError, LedgerResult}, - Ledger, + LatestBlockInner, Ledger, }; #[derive(Debug)] @@ -115,9 +115,11 @@ async fn replay_blocks( "Block has no timestamp, {block:?}", ))); }; - ledger - .latest_block() - .store(block.slot, block.blockhash, timestamp); + { + let block = + LatestBlockInner::new(block.slot, block.blockhash, timestamp); + ledger.latest_block().store(block); + } // Transactions are stored in the ledger ordered by most recent to latest // such to replay them in the order they executed we need to reverse them for txn in block.transactions.into_iter().rev() { diff --git a/magicblock-ledger/src/lib.rs b/magicblock-ledger/src/lib.rs index 35c747234..2828c0fa2 100644 --- a/magicblock-ledger/src/lib.rs +++ b/magicblock-ledger/src/lib.rs @@ -31,15 +31,13 @@ pub struct LatestBlock { /// readers automatically get access to the latest version of the block inner: Arc>>, /// Notification mechanism to signal that the block has been modified, - /// the actual state is not sent via channel, as it can be accessed any - /// time with `load` method, only the fact of production is communicated notifier: broadcast::Sender, } impl LatestBlockInner { - fn new(slot: u64, blockhash: Hash, timestamp: i64) -> Self { + pub fn new(slot: u64, blockhash: Hash, timestamp: i64) -> Self { let clock = Clock { - slot, + slot: slot + 1, unix_timestamp: timestamp, ..Default::default() }; @@ -68,8 +66,7 @@ impl LatestBlock { /// Atomically updates the latest block information and notifies all subscribers. /// This is the "writer" method for the single-writer, multi-reader pattern. - pub fn store(&self, slot: u64, blockhash: Hash, timestamp: i64) { - let block = LatestBlockInner::new(slot, blockhash, timestamp); + pub fn store(&self, block: LatestBlockInner) { self.inner.store(block.clone().into()); // Broadcast the update. It's okay if there are no active listeners. let _ = self.notifier.send(block); diff --git a/magicblock-ledger/src/store/api.rs b/magicblock-ledger/src/store/api.rs index 967a8959a..8c77e143f 100644 --- a/magicblock-ledger/src/store/api.rs +++ b/magicblock-ledger/src/store/api.rs @@ -42,7 +42,7 @@ use crate::{ errors::{LedgerError, LedgerResult}, metrics::LedgerRpcApiMetrics, store::utils::adjust_ulimit_nofile, - LatestBlock, + LatestBlock, LatestBlockInner, }; #[derive(Default, Debug)] @@ -168,7 +168,8 @@ impl Ledger { }; let (slot, blockhash) = ledger.get_max_blockhash()?; let time = ledger.get_block_time(slot)?.unwrap_or_default(); - ledger.latest_block.store(slot, blockhash, time); + let block = LatestBlockInner::new(slot, blockhash, time); + ledger.latest_block.store(block); ledger.initialize_lowest_cleanup_slot()?; Ok(ledger) @@ -366,18 +367,14 @@ impl Ledger { // NOTE: we kept the term block time even tough we don't produce blocks. // As far as we are concerned these are just the time when we advanced to // a specific slot. - pub fn write_block( - &self, - slot: Slot, - timestamp: UnixTimestamp, - blockhash: Hash, - ) -> LedgerResult<()> { - self.blocktime_cf.put(slot, ×tamp)?; + pub fn write_block(&self, block: LatestBlockInner) -> LedgerResult<()> { + self.blocktime_cf + .put(block.slot, &block.clock.unix_timestamp)?; self.blocktime_cf.try_increase_entry_counter(1); - self.blockhash_cf.put(slot, &blockhash)?; + self.blockhash_cf.put(block.slot, &block.blockhash)?; self.blockhash_cf.try_increase_entry_counter(1); - self.latest_block.store(slot, blockhash, timestamp); + self.latest_block.store(block); Ok(()) } @@ -1798,7 +1795,11 @@ mod tests { ) .is_ok()); assert!(store - .write_block(slot_uno, block_time_uno, block_hash_uno) + .write_block(LatestBlockInner::new( + slot_uno, + block_hash_uno, + block_time_uno + )) .is_ok()); // Get first transaction by signature providing high enough slot @@ -1830,7 +1831,11 @@ mod tests { ) .is_ok()); assert!(store - .write_block(slot_dos, block_time_dos, block_hash_dos) + .write_block(LatestBlockInner::new( + slot_dos, + block_hash_dos, + block_time_dos + )) .is_ok()); // Get second transaction by signature providing slot at which it was stored @@ -1850,7 +1855,11 @@ mod tests { // 1. Add some transaction statuses let (signature_uno, slot_uno) = (Signature::new_unique(), 10); store - .write_block(slot_uno, 0, BlockHash::new_unique()) + .write_block(LatestBlockInner::new( + slot_uno, + BlockHash::new_unique(), + 0, + )) .unwrap(); let (read_uno, write_uno) = { @@ -1873,7 +1882,11 @@ mod tests { let (signature_dos, slot_dos) = (Signature::new_unique(), 20); store - .write_block(slot_dos, 0, BlockHash::new_unique()) + .write_block(LatestBlockInner::new( + slot_dos, + BlockHash::new_unique(), + 0, + )) .unwrap(); let signature_dos_2 = Signature::new_unique(); let (read_dos, write_dos) = { @@ -1917,7 +1930,11 @@ mod tests { let (signature_tres, slot_tres) = (Signature::new_unique(), 30); store - .write_block(slot_tres, 0, BlockHash::new_unique()) + .write_block(LatestBlockInner::new( + slot_tres, + BlockHash::new_unique(), + 0, + )) .unwrap(); let (_read_tres, _write_tres) = { let (meta, mut writable_keys, mut readonly_keys) = @@ -1944,7 +1961,11 @@ mod tests { let (signature_cuatro, slot_cuatro) = (Signature::new_unique(), 31); store - .write_block(slot_cuatro, 0, BlockHash::new_unique()) + .write_block(LatestBlockInner::new( + slot_cuatro, + BlockHash::new_unique(), + 0, + )) .unwrap(); let (read_cuatro, _write_cuatro) = { let (meta, writable_keys, readonly_keys) = @@ -1966,7 +1987,11 @@ mod tests { let (signature_cinco, slot_cinco) = (Signature::new_unique(), 31); store - .write_block(slot_cinco, 0, BlockHash::new_unique()) + .write_block(LatestBlockInner::new( + slot_cinco, + BlockHash::new_unique(), + 0, + )) .unwrap(); let (_read_cinco, _write_cinco) = { let (meta, writable_keys, readonly_keys) = @@ -1988,7 +2013,11 @@ mod tests { let (signature_seis, slot_seis) = (Signature::new_unique(), 32); store - .write_block(slot_seis, 0, BlockHash::new_unique()) + .write_block(LatestBlockInner::new( + slot_seis, + BlockHash::new_unique(), + 0, + )) .unwrap(); let (_read_seis, _write_seis) = { let (meta, mut writable_keys, mut readonly_keys) = @@ -2030,14 +2059,40 @@ mod tests { // read_seis | write_seis : signature_seis // 2. Fill in block times - assert!(store.write_block(slot_uno, 1, Hash::new_unique()).is_ok()); - assert!(store.write_block(slot_dos, 2, Hash::new_unique()).is_ok()); - assert!(store.write_block(slot_tres, 3, Hash::new_unique()).is_ok()); assert!(store - .write_block(slot_cuatro, 4, Hash::new_unique()) + .write_block(LatestBlockInner::new(slot_uno, Hash::new_unique(), 1)) + .is_ok()); + assert!(store + .write_block(LatestBlockInner::new(slot_dos, Hash::new_unique(), 2)) + .is_ok()); + assert!(store + .write_block(LatestBlockInner::new( + slot_tres, + Hash::new_unique(), + 3 + )) + .is_ok()); + assert!(store + .write_block(LatestBlockInner::new( + slot_cuatro, + Hash::new_unique(), + 4 + )) + .is_ok()); + assert!(store + .write_block(LatestBlockInner::new( + slot_cinco, + Hash::new_unique(), + 5 + )) + .is_ok()); + assert!(store + .write_block(LatestBlockInner::new( + slot_seis, + Hash::new_unique(), + 6 + )) .is_ok()); - assert!(store.write_block(slot_cinco, 5, Hash::new_unique()).is_ok()); - assert!(store.write_block(slot_seis, 6, Hash::new_unique()).is_ok()); // 3. Find signatures for address with default limits let res = store @@ -2240,9 +2295,27 @@ mod tests { let (meta, writable_keys, readonly_keys) = create_transaction_status_meta(5); let read_uno = readonly_keys[0]; - assert!(store.write_block(slot1, 1, Hash::new_unique()).is_ok()); - assert!(store.write_block(slot2, 2, Hash::new_unique()).is_ok()); - assert!(store.write_block(slot3, 3, Hash::new_unique()).is_ok()); + assert!(store + .write_block(LatestBlockInner::new( + slot1, + Hash::new_unique(), + 1 + )) + .is_ok()); + assert!(store + .write_block(LatestBlockInner::new( + slot2, + Hash::new_unique(), + 2 + )) + .is_ok()); + assert!(store + .write_block(LatestBlockInner::new( + slot3, + Hash::new_unique(), + 3 + )) + .is_ok()); for (slot, signature) in &[ (slot1, sig1), (slot1, sig2), @@ -2487,7 +2560,11 @@ mod tests { .is_ok()); assert!(store - .write_block(slot_uno, 100, Hash::new_unique()) + .write_block(LatestBlockInner::new( + slot_uno, + Hash::new_unique(), + 100 + )) .is_ok()); assert!(store @@ -2515,7 +2592,11 @@ mod tests { ) .is_ok()); assert!(store - .write_block(slot_dos, 100, Hash::new_unique()) + .write_block(LatestBlockInner::new( + slot_dos, + Hash::new_unique(), + 100 + )) .is_ok()); assert!(store .write_transaction_memos( @@ -2608,7 +2689,9 @@ mod tests { meta, ) .unwrap(); - store.write_block(slot, 100, Hash::new_unique()).unwrap(); + store + .write_block(LatestBlockInner::new(slot, Hash::new_unique(), 100)) + .unwrap(); // Verify a properly signed transaction returns Some(true) let result = store.verify_transaction_signature(&signature).unwrap(); @@ -2675,7 +2758,9 @@ mod tests { meta, ) .unwrap(); - store.write_block(slot, 100, Hash::new_unique()).unwrap(); + store + .write_block(LatestBlockInner::new(slot, Hash::new_unique(), 100)) + .unwrap(); // The corrupted signature should fail verification let result = store.verify_transaction_signature(&bad_sig).unwrap(); diff --git a/magicblock-ledger/tests/get_block.rs b/magicblock-ledger/tests/get_block.rs index 0bb91a50a..7fa743c3b 100644 --- a/magicblock-ledger/tests/get_block.rs +++ b/magicblock-ledger/tests/get_block.rs @@ -1,5 +1,6 @@ mod common; +use magicblock_ledger::LatestBlockInner; use solana_hash::Hash; use test_kit::init_logger; @@ -20,9 +21,15 @@ fn test_get_block_meta() { let slot_1_hash = Hash::new_unique(); let slot_2_hash = Hash::new_unique(); - assert!(ledger.write_block(0, slot_0_time, slot_0_hash).is_ok()); - assert!(ledger.write_block(1, slot_1_time, slot_1_hash).is_ok()); - assert!(ledger.write_block(2, slot_2_time, slot_2_hash).is_ok()); + assert!(ledger + .write_block(LatestBlockInner::new(0, slot_0_hash, slot_0_time)) + .is_ok()); + assert!(ledger + .write_block(LatestBlockInner::new(1, slot_1_hash, slot_1_time)) + .is_ok()); + assert!(ledger + .write_block(LatestBlockInner::new(2, slot_2_hash, slot_2_time)) + .is_ok()); let slot_0_block = get_block(&ledger, 0); let slot_1_block = get_block(&ledger, 1); @@ -48,7 +55,11 @@ fn test_get_block_transactions() { let slot_41_block_time = 410; let slot_41_block_hash = Hash::new_unique(); ledger - .write_block(41, slot_41_block_time, slot_41_block_hash) + .write_block(LatestBlockInner::new( + 41, + slot_41_block_hash, + slot_41_block_time, + )) .unwrap(); let (slot_42_tx1, _) = write_dummy_transaction(&ledger, 42, 0); @@ -57,7 +68,11 @@ fn test_get_block_transactions() { let slot_42_block_time = 420; let slot_42_block_hash = Hash::new_unique(); ledger - .write_block(42, slot_42_block_time, slot_42_block_hash) + .write_block(LatestBlockInner::new( + 42, + slot_42_block_hash, + slot_42_block_time, + )) .unwrap(); let block_41 = get_block(&ledger, 41); diff --git a/magicblock-ledger/tests/test_ledger_truncator.rs b/magicblock-ledger/tests/test_ledger_truncator.rs index 4528843d6..6cffd3671 100644 --- a/magicblock-ledger/tests/test_ledger_truncator.rs +++ b/magicblock-ledger/tests/test_ledger_truncator.rs @@ -1,7 +1,9 @@ mod common; use std::{sync::Arc, time::Duration}; -use magicblock_ledger::{ledger_truncator::LedgerTruncator, Ledger}; +use magicblock_ledger::{ + ledger_truncator::LedgerTruncator, LatestBlockInner, Ledger, +}; use solana_hash::Hash; use solana_signature::Signature; use test_kit::init_logger; @@ -55,7 +57,9 @@ async fn test_truncator_not_purged_size() { for i in 0..NUM_TRANSACTIONS { write_dummy_transaction(&ledger, i, 0); - ledger.write_block(i, 0, Hash::new_unique()).unwrap() + ledger + .write_block(LatestBlockInner::new(i, Hash::new_unique(), 0)) + .unwrap() } let signatures = (0..NUM_TRANSACTIONS) .map(|i| { @@ -86,7 +90,9 @@ async fn test_truncator_non_empty_ledger() { let signatures = (0..FINAL_SLOT + 20) .map(|i| { let (_, signature) = write_dummy_transaction(&ledger, i, 0); - ledger.write_block(i, 0, Hash::new_unique()).unwrap(); + ledger + .write_block(LatestBlockInner::new(i, Hash::new_unique(), 0)) + .unwrap(); signature }) .collect::>(); @@ -129,7 +135,9 @@ async fn transaction_spammer( for _ in 0..tx_per_operation { let slot = signatures.len() as u64; let (_, signature) = write_dummy_transaction(&ledger, slot, 0); - ledger.write_block(slot, 0, Hash::new_unique()).unwrap(); + ledger + .write_block(LatestBlockInner::new(slot, Hash::new_unique(), 0)) + .unwrap(); signatures.push(signature); } @@ -187,7 +195,9 @@ async fn test_with_1gb_db() { } write_dummy_transaction(&ledger, slot, 0); - ledger.write_block(slot, 0, Hash::new_unique()).unwrap(); + ledger + .write_block(LatestBlockInner::new(slot, Hash::new_unique(), 0)) + .unwrap(); slot += 1 } diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs index e43131f17..b62fc9e31 100644 --- a/magicblock-processor/src/scheduler/mod.rs +++ b/magicblock-processor/src/scheduler/mod.rs @@ -43,6 +43,7 @@ //! 5. **Update program cache** - prune stale programs, re-root to new slot //! 6. **Update sysvars** - Clock and SlotHashes accounts +use core::matches; use std::{ sync::{Arc, RwLock}, thread::JoinHandle, @@ -371,7 +372,7 @@ impl TransactionScheduler { (self.slot, index) } }; - if is_execution { + if !matches!(txn.mode, TransactionProcessingMode::Simulation(_)) { self.hasher.update(txn.transaction.signature().as_ref()); } @@ -407,7 +408,7 @@ impl TransactionScheduler { block: Option, ) -> u64 { let block = self.prepare_block(block).await; - self.finalize_block(&block).await; + self.finalize_block(block.clone()).await; self.update_sysvars(&block); metrics::set_slot(block.slot); block.slot @@ -431,26 +432,17 @@ impl TransactionScheduler { /// Prepares block as primary: computes blockhash and broadcasts to replicas. async fn prepare_block_as_primary(&mut self) -> LatestBlockInner { - let blockhash: [u8; 32] = *self.hasher.finalize().as_bytes(); + let blockhash = (*self.hasher.finalize().as_bytes()).into(); // NOTE: // As we have a single node network, we have no // option but to use the time from host machine - let unix_timestamp = SystemTime::now() + let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() // NOTE: since we can tick very frequently, a lot // of blocks might have identical timestamps .as_secs() as i64; - let clock = Clock { - slot: self.slot + 1, - unix_timestamp, - ..Default::default() - }; - let block = LatestBlockInner { - slot: self.slot, - blockhash: blockhash.into(), - clock, - }; + let block = LatestBlockInner::new(self.slot, blockhash, timestamp); let msg = Message::Block(replication::Block { slot: block.slot, hash: block.blockhash, @@ -478,20 +470,13 @@ impl TransactionScheduler { } /// Finalizes the block: persists to ledger and updates accountsdb slot. - async fn finalize_block(&self, block: &LatestBlockInner) { + async fn finalize_block(&self, block: LatestBlockInner) { + self.accountsdb.set_slot(block.slot); if self.coordinator.is_primary() { - let _ = self - .ledger - .write_block( - block.slot, - block.clock.unix_timestamp, - block.blockhash, - ) - .inspect_err(|error| { - error!(%error, "failed to write block to the ledger") - }); + let _ = self.ledger.write_block(block).inspect_err( + |error| error!(%error, "failed to write block to the ledger"), + ); } - self.accountsdb.set_slot(block.slot); } /// Updates sysvars and program cache for the new slot. diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index 1d71ebc04..2fc857416 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -9,7 +9,7 @@ use std::{ time::Duration, }; -use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; +use magicblock_accounts_db::AccountsDb; use magicblock_core::link::{ accounts::AccountUpdateTx, replication::Message, @@ -131,8 +131,6 @@ impl TransactionSchedulerState { let slot_hashes = SlotHashes::new(&[(block.slot, block.blockhash); MAX_ENTRIES]); - // Remove first to avoid "account already exists" errors - self.accountsdb.remove_account(&sysvar::slot_hashes::ID); self.ensure_sysvar(&sysvar::slot_hashes::ID, &slot_hashes); // Immutable/Static sysvars (initialized once) diff --git a/magicblock-processor/tests/scheduling.rs b/magicblock-processor/tests/scheduling.rs index 633ac34ed..70e1896e3 100644 --- a/magicblock-processor/tests/scheduling.rs +++ b/magicblock-processor/tests/scheduling.rs @@ -177,7 +177,6 @@ async fn scenario_parallel_transfers(executors: u32) { let mut txs = Vec::with_capacity(pairs); for i in 0..pairs { - env.advance_slot(); txs.push(tx_transfer(&mut env, accounts[i * 2], accounts[i * 2 + 1])); } @@ -216,7 +215,6 @@ async fn scenario_conflicting_transfers(executors: u32) { let mut txs = Vec::with_capacity(senders.len()); for sender in &senders { - env.advance_slot(); txs.push(tx_transfer(&mut env, *sender, recipient)); } @@ -245,7 +243,6 @@ async fn scenario_readonly_parallel(executors: u32) { let mut txs = Vec::with_capacity(count); for i in 0..count { - env.advance_slot(); txs.push(tx_read( &mut env, &[accounts[i % 10], accounts[(i + 1) % 10]], @@ -272,7 +269,6 @@ async fn scenario_mixed_workload(executors: u32) { txs.push(tx_transfer(&mut env, accs[2], accs[3])); // C->D txs.push(tx_transfer(&mut env, accs[1], accs[0])); // B->A (conflict T1) txs.push(tx_transfer(&mut env, accs[4], accs[5])); // E->F - env.advance_slot(); } let sigs = schedule(&mut env, txs).await; @@ -307,7 +303,6 @@ async fn scenario_conflicting_writes(executors: u32) { let mut txs = Vec::with_capacity(count); for i in 1..=count { - env.advance_slot(); txs.push(tx_write(&mut env, acc, i as u8)); } @@ -330,7 +325,6 @@ async fn scenario_serial_conflicting_writes(executors: u32) { let mut txs = Vec::with_capacity(count); for i in 0..count { - env.advance_slot(); txs.push(tx_write(&mut env, acc, i as u8)); } @@ -359,7 +353,6 @@ async fn scenario_serial_transfer_chain(executors: u32) { let mut txs = Vec::with_capacity(count); for i in 0..count { - env.advance_slot(); txs.push(tx_transfer(&mut env, accs[i], accs[i + 1])); } @@ -399,7 +392,6 @@ async fn scenario_stress_test(executors: u32) { let mut txs = Vec::with_capacity(count); for i in 0..count { - env.advance_slot(); let idx = i % num_accs; let tx = match i % 4 { 0 => { diff --git a/magicblock-replicator/Cargo.toml b/magicblock-replicator/Cargo.toml index 4d5328fd8..285ffd5dc 100644 --- a/magicblock-replicator/Cargo.toml +++ b/magicblock-replicator/Cargo.toml @@ -13,6 +13,7 @@ bincode = { workspace = true } bytes = { workspace = true } futures = { workspace = true } magicblock-accounts-db = { workspace = true } +magicblock-chainlink = { workspace = true } magicblock-core = { workspace = true } magicblock-ledger = { workspace = true } machineid-rs = { workspace = true } diff --git a/magicblock-replicator/src/error.rs b/magicblock-replicator/src/error.rs index 2fc69d7bc..0890d4a82 100644 --- a/magicblock-replicator/src/error.rs +++ b/magicblock-replicator/src/error.rs @@ -2,6 +2,7 @@ use std::fmt::{Debug, Display}; +use magicblock_accounts_db::error::AccountsDbError; use magicblock_ledger::errors::LedgerError; use solana_transaction_error::TransactionError; @@ -20,6 +21,10 @@ pub enum Error { #[error("serialization error: {0}")] SerDe(#[from] bincode::Error), + /// AccountsDB access error. + #[error("accountsdb access error: {0}")] + AccountsDb(#[from] AccountsDbError), + /// Ledger access error. #[error("ledger access error: {0}")] Ledger(#[from] LedgerError), diff --git a/magicblock-replicator/src/nats/broker.rs b/magicblock-replicator/src/nats/broker.rs index dd1e07f9a..febde5c82 100644 --- a/magicblock-replicator/src/nats/broker.rs +++ b/magicblock-replicator/src/nats/broker.rs @@ -167,6 +167,8 @@ impl Broker { tokio::spawn(async move { if let Err(error) = store.put(meta, &mut file).await { error!(%error, "snapshot upload failed"); + } else { + info!(slot, "uploaded accountsdb snapshot"); } }); diff --git a/magicblock-replicator/src/nats/consumer.rs b/magicblock-replicator/src/nats/consumer.rs index 264813137..7740fa898 100644 --- a/magicblock-replicator/src/nats/consumer.rs +++ b/magicblock-replicator/src/nats/consumer.rs @@ -45,7 +45,6 @@ impl Consumer { durable_name: Some(id.into()), ack_policy: AckPolicy::All, ack_wait: cfg::ACK_WAIT, - max_ack_pending: cfg::MAX_ACK_PENDING, deliver_policy, ..Default::default() }, diff --git a/magicblock-replicator/src/nats/mod.rs b/magicblock-replicator/src/nats/mod.rs index 5ec8a46de..dcb700fa5 100644 --- a/magicblock-replicator/src/nats/mod.rs +++ b/magicblock-replicator/src/nats/mod.rs @@ -39,9 +39,9 @@ mod cfg { pub const META_SLOT: &str = "slot"; pub const META_SEQUENCE: &str = "sequence"; - // Size limits (256 GB stream, 512 GB snapshots) + // Size limits (256 GB stream, 1 GB snapshots) pub const STREAM_BYTES: i64 = 256 * 1024 * 1024 * 1024; - pub const SNAPSHOT_BYTES: i64 = 512 * 1024 * 1024 * 1024; + pub const SNAPSHOT_BYTES: i64 = 1024 * 1024 * 1024; // Timeouts pub const TTL_STREAM: Duration = Duration::from_secs(24 * 60 * 60); @@ -55,7 +55,6 @@ mod cfg { pub const RECONNECT_MAX_MS: u64 = 5000; // Backpressure - pub const MAX_ACK_PENDING: i64 = 512; pub const MAX_ACK_INFLIGHT: usize = 2048; pub const BATCH_SIZE: usize = 512; } diff --git a/magicblock-replicator/src/service/context.rs b/magicblock-replicator/src/service/context.rs index 5ecfab156..a1f57fc4f 100644 --- a/magicblock-replicator/src/service/context.rs +++ b/magicblock-replicator/src/service/context.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use machineid_rs::IdBuilder; use magicblock_accounts_db::AccountsDb; +use magicblock_chainlink::StubbedChainlink; use magicblock_core::{ link::{ replication::{Block, Message, SuperBlock}, @@ -11,7 +12,7 @@ use magicblock_core::{ }, Slot, TransactionIndex, }; -use magicblock_ledger::Ledger; +use magicblock_ledger::{LatestBlockInner, Ledger}; use tokio::{ fs::File, sync::mpsc::{Receiver, Sender}, @@ -38,6 +39,10 @@ pub struct ReplicationContext { pub mode_tx: Sender, /// Accounts database. pub accountsdb: Arc, + /// Mocked chainlink to reset accountsdb + /// TODO(bmuddha): this is a temporary hack, which will be removed + /// once the accounts management is moved to the accountsdb + pub chainlink: StubbedChainlink, /// Transaction ledger. pub ledger: Arc, /// Transaction scheduler. @@ -52,11 +57,13 @@ pub struct ReplicationContext { impl ReplicationContext { /// Creates context from ledger state. + #[allow(clippy::too_many_arguments)] pub async fn new( broker: Broker, mode_tx: Sender, accountsdb: Arc, ledger: Arc, + chainlink: StubbedChainlink, scheduler: TransactionSchedulerHandle, cancel: CancellationToken, can_promote: bool, @@ -66,21 +73,20 @@ impl ReplicationContext { .build("magicblock") .map_err(|e| Error::Internal(e.to_string()))?; - let (slot, index) = ledger - .get_latest_transaction_position()? - .unwrap_or_default(); + let slot = accountsdb.slot(); - info!(%id, slot, index, can_promote, "context initialized"); + info!(%id, slot, can_promote, "context initialized"); Ok(Self { id, broker, cancel, mode_tx, accountsdb, + chainlink, ledger, scheduler, slot, - index, + index: 0, can_promote, }) } @@ -95,8 +101,9 @@ impl ReplicationContext { pub async fn write_block(&self, block: &Block) -> Result<()> { // wait for the scheduler to accept all of the previous block transactions let _guard = self.scheduler.wait_for_idle().await; - self.ledger - .write_block(block.slot, block.timestamp, block.hash)?; + let block = + LatestBlockInner::new(block.slot, block.hash, block.timestamp); + self.ledger.write_block(block)?; Ok(()) } @@ -109,7 +116,7 @@ impl ReplicationContext { Ok(()) } else { let msg = format!( - "accountsdb state mismatch at {}, expected {checksum}, got {}", + "accountsdb state mismatch at {}, expected {}, got {checksum}", sb.slot, sb.checksum ); Err(Error::Internal(msg)) @@ -174,6 +181,9 @@ impl ReplicationContext { messages: Receiver, ) -> Result { let snapshots = self.create_snapshot_watcher()?; + // TODO(bmuddha): remove dependency on the chainlink + let _guard = self.scheduler.wait_for_idle().await; + self.chainlink.reset_accounts_bank()?; self.enter_primary_mode().await; Ok(Primary::new(self, producer, messages, snapshots)) } diff --git a/magicblock-replicator/src/service/mod.rs b/magicblock-replicator/src/service/mod.rs index ee3af9d6b..423cc71a0 100644 --- a/magicblock-replicator/src/service/mod.rs +++ b/magicblock-replicator/src/service/mod.rs @@ -27,6 +27,7 @@ use std::{sync::Arc, thread::JoinHandle, time::Duration}; pub use context::ReplicationContext; use magicblock_accounts_db::AccountsDb; +use magicblock_chainlink::StubbedChainlink; use magicblock_core::link::{ replication::Message, transactions::{SchedulerMode, TransactionSchedulerHandle}, @@ -47,7 +48,7 @@ use crate::{nats::Broker, Result}; // ============================================================================= pub(crate) const LOCK_REFRESH_INTERVAL: Duration = Duration::from_secs(1); -pub(crate) const LEADER_TIMEOUT: Duration = Duration::from_secs(10); +pub(crate) const LEADER_TIMEOUT: Duration = Duration::from_secs(5); const CONSUMER_RETRY_DELAY: Duration = Duration::from_secs(1); // ============================================================================= @@ -71,6 +72,7 @@ impl Service { mode_tx: Sender, accountsdb: Arc, ledger: Arc, + chainlink: StubbedChainlink, scheduler: TransactionSchedulerHandle, messages: Receiver, cancel: CancellationToken, @@ -82,6 +84,7 @@ impl Service { mode_tx, accountsdb, ledger, + chainlink, scheduler, cancel, can_promote, diff --git a/magicblock-replicator/src/service/primary.rs b/magicblock-replicator/src/service/primary.rs index 7f812d44a..b7aa9b295 100644 --- a/magicblock-replicator/src/service/primary.rs +++ b/magicblock-replicator/src/service/primary.rs @@ -40,6 +40,7 @@ impl Primary { /// Returns `Some(Standby)` on demotion, `None` on shutdown. #[instrument(skip(self))] pub async fn run(mut self) -> Result> { + info!("entering primary replication mode"); let mut lock_tick = tokio::time::interval(LOCK_REFRESH_INTERVAL); loop { diff --git a/magicblock-replicator/src/service/standby.rs b/magicblock-replicator/src/service/standby.rs index a47614a4c..8b33cce7a 100644 --- a/magicblock-replicator/src/service/standby.rs +++ b/magicblock-replicator/src/service/standby.rs @@ -2,7 +2,7 @@ use std::time::{Duration, Instant}; -use async_nats::Message as NatsMessage; +use async_nats::jetstream::Message as NatsMessage; use futures::StreamExt; use magicblock_core::link::{ replication::{Message, Transaction}, @@ -51,6 +51,7 @@ impl Standby { /// Runs until leadership acquired or shutdown. /// Returns `Some(Primary)` on promotion, `None` on shutdown. pub async fn run(mut self) -> Result> { + info!("entering standby replication mode"); let mut timeout_check = tokio::time::interval(Duration::from_secs(1)); let Some(mut stream) = self.consumer.messages(&self.ctx.cancel).await else { @@ -91,8 +92,11 @@ impl Standby { Err(e) => warn!(%e, "message consumption stream error"), } } - _ = timeout_check.tick(), if self.last_activity.elapsed() > LEADER_TIMEOUT => { - if !self.can_promote { + _ = timeout_check.tick() => { + if self.last_activity.elapsed() < LEADER_TIMEOUT { + continue; + } + if !self.can_promote { warn!("leader timeout reached, but takeover disabled (ReplicaOnly mode)"); continue; } @@ -130,22 +134,24 @@ impl Standby { } let result = match message { - Message::Transaction(txn) => { - self.replay_tx(txn).await - } + Message::Transaction(txn) => self.replay_tx(txn).await, Message::Block(block) => self.ctx.write_block(&block).await, - Message::SuperBlock(sb) => { - self.ctx.verify_checksum(&sb).await.inspect_err(|error| - error!(slot, %error, "accountsdb state has diverged") - ) - } + Message::SuperBlock(sb) => self.ctx.verify_checksum(&sb).await, }; if let Err(error) = result { - warn!(slot, index, %error, "message processing error"); + error!(slot, index, %error, "message processing error"); return; } self.ctx.update_position(slot, index); + // NOTE: + // for performance reasons we batch messages from NATS and ack the + // entire batch on slot boudaries, instead of on every message + if current_slot < self.ctx.slot { + if let Err(error) = msg.ack().await { + warn!(%error, "failed to ack nats message"); + } + } } /// Check whether consumer has any undelivered messages in the stream diff --git a/magicblock-replicator/src/tests.rs b/magicblock-replicator/src/tests.rs index 2b3247f05..4dbc5ca79 100644 --- a/magicblock-replicator/src/tests.rs +++ b/magicblock-replicator/src/tests.rs @@ -56,6 +56,35 @@ async fn test_watcher_ignores_non_snapshots() { assert_eq!(contents, test_data); } +#[tokio::test] +async fn test_watcher_ignores_older_slots() { + let temp_dir = TempDir::new().unwrap(); + let mut watcher = SnapshotWatcher::new(temp_dir.path()).unwrap(); + + let newer_path = temp_dir.path().join("snapshot-000000000010.tar.gz"); + std::fs::File::create(&newer_path) + .unwrap() + .write_all(b"newer") + .unwrap(); + + let (_, slot) = + tokio::time::timeout(Duration::from_secs(2), watcher.recv()) + .await + .expect("Timeout waiting for snapshot") + .expect("Channel closed"); + assert_eq!(slot, 10); + + let older_path = temp_dir.path().join("snapshot-000000000009.tar.gz"); + std::fs::File::create(&older_path) + .unwrap() + .write_all(b"older") + .unwrap(); + + tokio::time::timeout(Duration::from_millis(300), watcher.recv()) + .await + .expect_err("older snapshot should be ignored"); +} + #[test] fn test_parse_slot() { assert_eq!( diff --git a/magicblock-replicator/src/watcher.rs b/magicblock-replicator/src/watcher.rs index d7c962923..0b398cc2d 100644 --- a/magicblock-replicator/src/watcher.rs +++ b/magicblock-replicator/src/watcher.rs @@ -3,7 +3,10 @@ //! Monitors a directory for new `.tar.gz` snapshot files and yields them //! as open [`tokio::fs::File`] handles via a channel for tokio::select compatibility. -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::{fs::File, sync::mpsc}; @@ -13,6 +16,8 @@ use crate::Result; const SNAPSHOT_EXTENSION: &str = "tar.gz"; const SNAPSHOT_PREFIX: &str = "snapshot-"; +const OPEN_RETRY_DELAY: Duration = Duration::from_millis(50); +const OPEN_RETRIES: usize = 8; /// Extracts the slot number from a snapshot filename. /// @@ -34,6 +39,7 @@ pub fn parse_slot(path: &Path) -> Option { pub struct SnapshotWatcher { _watcher: RecommendedWatcher, rx: mpsc::Receiver, + last_slot: Option, } impl SnapshotWatcher { @@ -51,17 +57,14 @@ impl SnapshotWatcher { let mut watcher = notify::recommended_watcher(move |res: notify::Result| { - match res { - Ok(event) => { - if let Some(path) = Self::process_event(&event) { - if let Err(e) = tx.blocking_send(path) { - error!("Failed to send snapshot event: {}", e); - } - } - } - Err(e) => { - error!("Watch error: {}", e); - } + let Ok(event) = res else { + return; + }; + let Some(path) = Self::process_event(&event) else { + return; + }; + if let Err(e) = tx.blocking_send(path) { + error!("Failed to send snapshot event: {}", e); } })?; @@ -71,6 +74,7 @@ impl SnapshotWatcher { Ok(Self { _watcher: watcher, rx, + last_slot: None, }) } @@ -82,7 +86,6 @@ impl SnapshotWatcher { for path in &event.paths { if Self::is_snapshot_file(path) { - info!(path = %path.display(), "Detected new snapshot"); return Some(path.clone()); } } @@ -110,10 +113,28 @@ impl SnapshotWatcher { let Some(slot) = parse_slot(&path) else { continue; }; - let Ok(file) = File::open(&path).await else { + + if self.last_slot.is_some_and(|last| slot <= last) { + continue; + } + + let Some(file) = Self::open_with_retry(&path).await else { continue; }; + self.last_slot = Some(slot); break Some((file, slot)); } } + + async fn open_with_retry(path: &Path) -> Option { + for _ in 0..OPEN_RETRIES { + if let Ok(file) = File::open(path).await { + return Some(file); + } + + tokio::time::sleep(OPEN_RETRY_DELAY).await; + } + + None + } } diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index f84a86c2f..6a1c50fa3 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -4133,6 +4133,7 @@ dependencies = [ "futures", "machineid-rs", "magicblock-accounts-db", + "magicblock-chainlink", "magicblock-core", "magicblock-ledger", "notify", diff --git a/test-integration/test-chainlink/src/ixtest_context.rs b/test-integration/test-chainlink/src/ixtest_context.rs index 6fad665e6..aefdc8d6a 100644 --- a/test-integration/test-chainlink/src/ixtest_context.rs +++ b/test-integration/test-chainlink/src/ixtest_context.rs @@ -73,7 +73,6 @@ impl IxtestContext { pub async fn init_with_config(config: ChainlinkConfig) -> Self { let validator_kp = Keypair::from_bytes(&TEST_AUTHORITY[..]).unwrap(); - let faucet_kp = Keypair::new(); let commitment = CommitmentConfig::confirmed(); let bank = Arc::::default(); @@ -126,7 +125,6 @@ impl IxtestContext { &bank, &cloner, validator_kp.insecure_clone(), - faucet_kp.pubkey(), rx, None, )), @@ -143,7 +141,6 @@ impl IxtestContext { &bank, fetch_cloner, validator_kp.pubkey(), - faucet_kp.pubkey(), &ChainLinkConfig::default(), ) .unwrap(); diff --git a/test-integration/test-chainlink/src/test_context.rs b/test-integration/test-chainlink/src/test_context.rs index 71128cc06..0610a3df7 100644 --- a/test-integration/test-chainlink/src/test_context.rs +++ b/test-integration/test-chainlink/src/test_context.rs @@ -68,7 +68,6 @@ impl TestContext { let cloner = Arc::new(ClonerStub::new(bank.clone())); let validator_keypair = Keypair::new(); let validator_pubkey = validator_keypair.pubkey(); - let faucet_pubkey = Pubkey::new_unique(); let chain_slot = Arc::new(AtomicU64::new(slot)); let (fetch_cloner, remote_account_provider) = { let (tx, rx) = tokio::sync::mpsc::channel(100); @@ -102,7 +101,6 @@ impl TestContext { &bank, &cloner, validator_keypair.insecure_clone(), - faucet_pubkey, rx, None, )), @@ -119,7 +117,6 @@ impl TestContext { &bank, fetch_cloner, validator_pubkey, - faucet_pubkey, &ChainLinkConfig::default(), ) .unwrap(); diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index eb2fe9023..92de2b073 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -21,7 +21,7 @@ use magicblock_core::{ }, Slot, }; -use magicblock_ledger::Ledger; +use magicblock_ledger::{LatestBlockInner, Ledger}; use magicblock_processor::{ build_svm_env, loader::load_upgradeable_programs, @@ -255,6 +255,43 @@ impl ExecutionTestEnv { tokio::time::sleep(std::time::Duration::from_millis(10)).await; } + /// Waits for the scheduler to advance to the next slot. + pub fn wait_for_next_slot(&self) { + let initial_slot = self.ledger.latest_block().load().slot; + let start = std::time::Instant::now(); + while self.ledger.latest_block().load().slot <= initial_slot { + if start.elapsed() > std::time::Duration::from_secs(5) { + panic!( + "Timed out waiting for slot to advance: slot {}", + initial_slot + ); + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + + /// Advances the slot and writes a new block to the ledger. + pub fn advance_slot(&self) -> Slot { + let block = self.ledger.latest_block(); + let b = block.load(); + let slot = b.slot + 1; + let hash = { + let mut hasher = Hasher::default(); + hasher.hash(b.blockhash.as_ref()); + hasher.hash(&b.slot.to_le_bytes()); + hasher.result() + }; + let time = slot as i64; + self.ledger + .write_block(LatestBlockInner::new(slot, hash, time)) + .expect("failed to write new block to the ledger"); + self.accountsdb.set_slot(slot); + + // Yield to allow other tasks (like the executor) to process the slot change. + thread::yield_now(); + slot + } + /// Creates a new account with the specified properties. /// Note: This helper automatically marks the account as `delegated`. pub fn create_account_with_config( @@ -305,31 +342,6 @@ impl ExecutionTestEnv { .map(|(_, m)| m) } - /// Simulates the production of a new block. - /// - /// This advances the slot, calculates a new blockhash, writes the block to the - /// ledger, and broadcasts a `BlockUpdate` notification. - pub fn advance_slot(&self) -> Slot { - let block = self.ledger.latest_block(); - let b = block.load(); - let slot = b.slot + 1; - let hash = { - let mut hasher = Hasher::default(); - hasher.hash(b.blockhash.as_ref()); - hasher.hash(&b.slot.to_le_bytes()); - hasher.result() - }; - let time = slot as i64; - self.ledger - .write_block(slot, time, hash) - .expect("failed to write new block to the ledger"); - self.accountsdb.set_slot(slot); - - // Yield to allow other tasks (like the executor) to process the slot change. - thread::yield_now(); - slot - } - /// Builds a transaction with the given instructions, signed by the default payer. pub fn build_transaction(&self, ixs: &[Instruction]) -> Transaction { let payer = {