Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ magicblock-test-storage/

# AI related
**/CLAUDE.md
CODEBASE_MAP.md
config.json
config.toml
AGENTS.md
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async-nats = "0.46"
async-trait = "0.1.77"
base64 = "0.21.7"
bincode = "1.3.3"
blake3 = "1.8"
bytes = { version = "1.0", features = ["serde"] }
borsh = { version = "1.5.1", features = ["derive", "unstable__schema"] }
bs58 = "0.5.1"
Expand Down
7 changes: 7 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ reset = false
# Env: MBV_LEDGER__BLOCK_TIME
block-time = "400ms"

# The number of slots included in a superblock.
# accountsdb checksum/snapshot is taken at superblock boundary
#
# Default: 72000
# Env: MBV_LEDGER__SUPERBLOCK_SIZE
superblock-size = 72000

# Upper hard threshold for the max size of the ledger (in bytes).
# Ledger truncation logic kicks in when disk usage approaches this limit.
# Default: 536_870_912 (512 MB)
Expand Down
6 changes: 6 additions & 0 deletions magicblock-accounts-db/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ impl SnapshotManager {
tar.append_dir_all(".", snapshot_dir)
.log_err(|| "Failed to append directory to tar")?;
tar.finish().log_err(|| "Failed to finalize tar archive")?;
let enc = tar
.into_inner()
.log_err(|| "Failed to recover gzip encoder from tar builder")?;
let file = enc.finish().log_err(|| "Failed to finish gzip archive")?;
file.sync_all()
.log_err(|| "Failed to sync archive to disk")?;

// Atomically rename to final path
fs::rename(&tmp_path, &archive_path).log_err(|| {
Expand Down
16 changes: 8 additions & 8 deletions magicblock-aperture/src/geyser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{
use json::{JsonValueTrait, Value};
use libloading::{Library, Symbol};
use magicblock_core::link::{
accounts::AccountWithSlot, blocks::BlockUpdate,
transactions::TransactionStatus,
accounts::AccountWithSlot, transactions::TransactionStatus,
};
use magicblock_ledger::LatestBlockInner;
use solana_account::ReadableAccount;
use solana_transaction_status::RewardsAndNumPartitions;

Expand Down Expand Up @@ -152,19 +152,19 @@ impl GeyserPluginManager {

pub fn notify_block(
&self,
block: &BlockUpdate,
block: &LatestBlockInner,
) -> Result<(), GeyserPluginError> {
check_if_enabled!(self);
let block = ReplicaBlockInfoV4 {
slot: block.meta.slot,
parent_slot: block.meta.slot.saturating_sub(1),
blockhash: &block.hash.to_string(),
block_height: Some(block.meta.slot),
slot: block.slot,
parent_slot: block.slot.saturating_sub(1),
blockhash: &block.blockhash.to_string(),
block_height: Some(block.slot),
rewards: &RewardsAndNumPartitions {
rewards: Vec::new(),
num_partitions: None,
},
block_time: Some(block.meta.time),
block_time: Some(block.clock.unix_timestamp),
// TODO(bmuddha): register proper values with the new ledger
parent_blockhash: "11111111111111111111111111111111",
executed_transaction_count: 0,
Expand Down
25 changes: 17 additions & 8 deletions magicblock-aperture/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use magicblock_core::link::{
accounts::AccountUpdateRx, blocks::BlockUpdateRx,
transactions::TransactionStatusRx, DispatchEndpoints,
};
use magicblock_ledger::LatestBlockInner;
use tokio_util::sync::CancellationToken;
use tracing::{info, instrument, warn};

Expand Down Expand Up @@ -48,8 +49,8 @@ pub(crate) struct EventProcessor {
account_update_rx: AccountUpdateRx,
/// A receiver for transaction status events, sourced from the `TransactionExecutor`.
transaction_status_rx: TransactionStatusRx,
/// A receiver for new block events.
block_update_rx: BlockUpdateRx,
/// A receiver for block update events from the ledger.
block_update_rx: BlockUpdateRx<LatestBlockInner>,
/// An entry point for communicating with loaded geyser plugins
geyser: Arc<GeyserPluginManager>,
}
Expand All @@ -61,13 +62,17 @@ impl EventProcessor {
state: &SharedState,
geyser: Arc<GeyserPluginManager>,
) -> ApertureResult<Self> {
let latest_block = state.ledger.latest_block().clone();
// Subscribe to block updates immediately to ensure we don't miss any
// notifications that might be sent before the `run()` method is polled.
let block_update_rx = latest_block.subscribe();
Ok(Self {
subscriptions: state.subscriptions.clone(),
transactions: state.transactions.clone(),
blocks: state.blocks.clone(),
account_update_rx: channels.account_update.clone(),
transaction_status_rx: channels.transaction_status.clone(),
block_update_rx: channels.block_update.clone(),
block_update_rx,
geyser,
})
}
Expand Down Expand Up @@ -107,24 +112,28 @@ impl EventProcessor {
#[instrument(skip(self, cancel), fields(processor_id = id))]
async fn run(self, id: usize, cancel: CancellationToken) {
info!("Event processor started");
let mut block_update_rx = self.block_update_rx;
loop {
tokio::select! {
biased;

// Process a new block.
Ok(latest) = self.block_update_rx.recv_async() => {
// Process a new block. We use `recv()` which returns `Ok(())` on
// success or `Err(Lagged)` if we fell behind. In either case, we
// want to update with the latest block. Only `Err(Closed)` should
// stop us, but that's handled by the cancel token.
Ok(latest) = block_update_rx.recv() => {
// Notify subscribers waiting on slot updates.
self.subscriptions.send_slot(latest.meta.slot);
self.subscriptions.send_slot(latest.slot);
// Notify registered geyser plugins (if any) of the latest slot.
let _ = self.geyser.notify_slot(latest.meta.slot).inspect_err(|e| {
let _ = self.geyser.notify_slot(latest.slot).inspect_err(|e| {
warn!(error = ?e, "Geyser slot update failed");
});
// Notify listening geyser plugins
let _ = self.geyser.notify_block(&latest).inspect_err(|e| {
warn!(error = ?e, "Geyser block update failed");
});
// Update the global blocks cache with the latest block.
self.blocks.set_latest(latest);
self.blocks.set_latest(&latest);
}

// Process a new account state update.
Expand Down
20 changes: 10 additions & 10 deletions magicblock-aperture/src/state/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{ops::Deref, sync::Arc, time::Duration};

use arc_swap::ArcSwapAny;
use magicblock_core::{
link::blocks::{BlockHash, BlockMeta, BlockUpdate},
Slot,
};
use magicblock_core::{link::blocks::BlockHash, Slot};
use magicblock_ledger::LatestBlockInner;
use solana_rpc_client_api::response::RpcBlockhash;

use super::ExpiringCache;
Expand All @@ -26,7 +24,7 @@ pub(crate) struct BlocksCache {
/// Latest observed block (updated whenever the ledger transitions to new slot)
latest: ArcSwapAny<Arc<LastCachedBlock>>,
/// An underlying time-based cache for storing `BlockHash` to `BlockMeta` mappings.
cache: ExpiringCache<BlockHash, BlockMeta>,
cache: ExpiringCache<BlockHash, Slot>,
}

/// Last produced block that has been put into cache. We need to keep this separately,
Expand All @@ -38,7 +36,7 @@ pub(crate) struct LastCachedBlock {
}

impl Deref for BlocksCache {
type Target = ExpiringCache<BlockHash, BlockMeta>;
type Target = ExpiringCache<BlockHash, Slot>;
fn deref(&self) -> &Self::Target {
&self.cache
}
Expand All @@ -61,6 +59,8 @@ impl BlocksCache {
let blocktime_ratio = SOLANA_BLOCK_TIME / blocktime as f64;
let block_validity = blocktime_ratio * MAX_VALID_BLOCKHASH_SLOTS;
let cache = ExpiringCache::new(BLOCK_CACHE_TTL);
// Add the initial blockhash to the cache so it's recognized as valid
cache.push(latest.blockhash, latest.slot);
Self {
latest: ArcSwapAny::new(latest.into()),
block_validity: block_validity as u64,
Expand All @@ -69,14 +69,14 @@ impl BlocksCache {
}

/// Updates the latest block information in the cache.
pub(crate) fn set_latest(&self, latest: BlockUpdate) {
pub(crate) fn set_latest(&self, latest: &LatestBlockInner) {
let last = LastCachedBlock {
blockhash: latest.hash,
slot: latest.meta.slot,
blockhash: latest.blockhash,
slot: latest.slot,
};

// Register the block in the expiring cache
self.cache.push(latest.hash, latest.meta);
self.cache.push(latest.blockhash, latest.slot);
// And mark it as latest observed
self.latest.swap(last.into());
}
Expand Down
1 change: 0 additions & 1 deletion magicblock-aperture/tests/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ async fn test_get_account_info() {
.get_account_with_commitment(&Pubkey::new_unique(), Default::default())
.await
.expect("rpc request for non-existent account failed");
assert_eq!(nonexistent.context.slot, env.latest_slot());
assert_eq!(nonexistent.value, None, "account should not exist");
}

Expand Down
Loading
Loading