Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions magicblock-accounts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ magicblock-accounts-db = { workspace = true }
magicblock-chainlink = { workspace = true }
magicblock-committor-service = { workspace = true }
magicblock-core = { workspace = true }
magicblock-metrics = { workspace = true }
magicblock-program = { workspace = true }
solana-hash = { workspace = true }
solana-pubkey = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions magicblock-accounts/src/scheduled_commits_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use magicblock_committor_service::{
intent_executor::ExecutionOutput, BaseIntentCommittor, CommittorService,
};
use magicblock_core::link::transactions::TransactionSchedulerHandle;
use magicblock_metrics::metrics;
use magicblock_program::{
magic_scheduled_base_intent::ScheduledIntentBundle,
register_scheduled_commit_sent, SentCommit, TransactionScheduler,
Expand Down Expand Up @@ -275,6 +276,7 @@ impl ScheduledCommitsProcessor for ScheduledCommitsProcessorImpl {
if intent_bundles.is_empty() {
return Ok(());
}
metrics::inc_committor_intents_count_by(intent_bundles.len() as u64);

// Add metas for intent we schedule
let pubkeys_being_undelegated = {
Expand Down
1 change: 1 addition & 0 deletions magicblock-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition.workspace = true
anyhow = { workspace = true }
borsh = "1.5.3"
fd-lock = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }

magic-domain-program = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions magicblock-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod errors;
mod fund_account;
mod genesis_utils;
pub mod ledger;
mod magic_sys_adapter;
pub mod magic_validator;
mod slot;
mod tickers;
86 changes: 86 additions & 0 deletions magicblock-api/src/magic_sys_adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::{collections::HashMap, error::Error, sync::Arc};

use magicblock_committor_service::{BaseIntentCommittor, CommittorService};
use magicblock_core::{intent::CommittedAccount, traits::MagicSys};
use magicblock_ledger::Ledger;
use solana_instruction::error::InstructionError;
use solana_pubkey::Pubkey;
use tracing::{enabled, error, trace, Level};

#[derive(Clone)]
pub struct MagicSysAdapter {
ledger: Arc<Ledger>,
committor_service: Option<Arc<CommittorService>>,
}

impl MagicSysAdapter {
const RECV_ERR: u32 = 0xE000_0000;
const FETCH_ERR: u32 = 0xE001_0000;
Comment thread
taco-paco marked this conversation as resolved.
Outdated
const NO_COMMITTOR_ERR: u32 = 0xE002_0000;
Comment thread
taco-paco marked this conversation as resolved.
Outdated

pub fn new(
ledger: Arc<Ledger>,
committor_service: Option<Arc<CommittorService>>,
) -> Self {
Self {
ledger,
committor_service,
}
}
}

impl MagicSys for MagicSysAdapter {
fn persist(&self, id: u64, data: Vec<u8>) -> Result<(), Box<dyn Error>> {
trace!(id, data_len = data.len(), "Persisting data");
self.ledger.write_account_mod_data(id, &data.into())?;
Comment thread
taco-paco marked this conversation as resolved.
Ok(())
}

fn load(&self, id: u64) -> Result<Option<Vec<u8>>, Box<dyn Error>> {
let data = self.ledger.read_account_mod_data(id)?.map(|x| x.data);
if enabled!(Level::TRACE) {
if let Some(data) = &data {
trace!(id, data_len = data.len(), "Loading data");
} else {
trace!(id, found = false, "Loading data");
}
}
Ok(data)
}

fn fetch_current_commit_nonces(
&self,
commits: &[CommittedAccount],
) -> Result<HashMap<Pubkey, u64>, InstructionError> {
let committor_service =
if let Some(committor_service) = &self.committor_service {
Ok(committor_service)
} else {
Err(InstructionError::Custom(Self::NO_COMMITTOR_ERR))
}?;
Comment thread
taco-paco marked this conversation as resolved.

let min_context_slot = commits
.iter()
.map(|account| account.remote_slot)
.max()
.unwrap_or(0);
let pubkeys: Vec<_> =
commits.iter().map(|account| account.pubkey).collect();

let receiver = committor_service
.fetch_current_commit_nonces(&pubkeys, min_context_slot);
// Tx execution is sync and runs on a tokio worker thread. handle.block_on
// would panic (nested runtime). futures::executor::block_on parks this
// thread independently of tokio — safe because the thread is already
// committed to this tx until execution completes.
futures::executor::block_on(receiver)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
.inspect_err(|err| {
error!(error = ?err, "Failed to receive nonces from CommittorService")
})
.map_err(|_| InstructionError::Custom(Self::RECV_ERR))?
Comment thread
taco-paco marked this conversation as resolved.
Outdated
.inspect_err(|err| {
error!(error = ?err, "Failed to fetch current commit nonces")
})
.map_err(|_| InstructionError::Custom(Self::FETCH_ERR))
Comment thread
taco-paco marked this conversation as resolved.
}
}
9 changes: 7 additions & 2 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use magicblock_processor::{
scheduler::{state::TransactionSchedulerState, TransactionScheduler},
};
use magicblock_program::{
init_persister,
init_magic_sys,
validator::{self, validator_authority},
TransactionScheduler as ActionTransactionScheduler,
};
Expand Down Expand Up @@ -91,6 +91,7 @@ use crate::{
self, read_validator_keypair_from_ledger, validator_keypair_path,
write_validator_keypair_to_ledger,
},
magic_sys_adapter::MagicSysAdapter,
slot::advance_slot_and_update_ledger,
tickers::{init_slot_ticker, init_system_metrics_ticker},
};
Expand Down Expand Up @@ -209,6 +210,11 @@ impl MagicValidator {
let step_start = Instant::now();
let committor_service = Self::init_committor_service(&config).await?;
log_timing("startup", "committor_service_init", step_start);
init_magic_sys(Arc::new(MagicSysAdapter::new(
ledger.clone(),
committor_service.clone(),
)));

let step_start = Instant::now();
let chainlink = Arc::new(
Self::init_chainlink(
Expand Down Expand Up @@ -474,7 +480,6 @@ impl MagicValidator {
) -> ApiResult<(Arc<Ledger>, Slot)> {
let (ledger, last_slot) = ledger::init(storage, ledger_config)?;
let ledger_shared = Arc::new(ledger);
init_persister(ledger_shared.clone());
Ok((ledger_shared, last_slot))
}

Expand Down
2 changes: 2 additions & 0 deletions magicblock-committor-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lru = { workspace = true }
magicblock-committor-program = { workspace = true, features = [
"no-entrypoint",
] }
magicblock-core = { workspace = true }
magicblock-delegation-program = { workspace = true, features = [
"no-entrypoint",
] }
Expand Down Expand Up @@ -56,6 +57,7 @@ tokio-util = { workspace = true }

[dev-dependencies]
solana-signature = { workspace = true, features = ["rand"] }
serde_json = { workspace = true }
lazy_static = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
Expand Down
25 changes: 24 additions & 1 deletion magicblock-committor-service/src/committor_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::HashSet, path::Path, sync::Arc};
use std::{
collections::{HashMap, HashSet},
path::Path,
sync::Arc,
};

use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle;
use magicblock_rpc_client::MagicblockRpcClient;
Expand All @@ -16,6 +20,9 @@ use crate::{
intent_execution_manager::{
db::DummyDB, BroadcastedIntentExecutionResult, IntentExecutionManager,
},
intent_executor::task_info_fetcher::{
CacheTaskInfoFetcher, TaskInfoFetcher, TaskInfoFetcherResult,
},
persist::{
CommitStatusRow, IntentPersister, IntentPersisterImpl,
MessageSignatures,
Expand All @@ -28,6 +35,7 @@ pub(crate) struct CommittorProcessor {
pub(crate) authority: Keypair,
persister: IntentPersisterImpl,
commits_scheduler: IntentExecutionManager<DummyDB>,
task_info_fetcher: Arc<CacheTaskInfoFetcher>,
}

impl CommittorProcessor {
Expand Down Expand Up @@ -58,9 +66,12 @@ impl CommittorProcessor {
let persister = IntentPersisterImpl::try_new(persist_file)?;

// Create commit scheduler
let task_info_fetcher =
Arc::new(CacheTaskInfoFetcher::new(magic_block_rpc_client.clone()));
let commits_scheduler = IntentExecutionManager::new(
magic_block_rpc_client.clone(),
DummyDB::new(),
task_info_fetcher.clone(),
Some(persister.clone()),
table_mania.clone(),
chain_config.compute_budget_config.clone(),
Expand All @@ -72,6 +83,7 @@ impl CommittorProcessor {
table_mania,
commits_scheduler,
persister,
task_info_fetcher,
})
}

Expand Down Expand Up @@ -149,4 +161,15 @@ impl CommittorProcessor {
) -> broadcast::Receiver<BroadcastedIntentExecutionResult> {
self.commits_scheduler.subscribe_for_results()
}

/// Fetches current commit nonces
pub async fn fetch_current_commit_nonces(
&self,
pubkeys: &[Pubkey],
min_context_slot: u64,
) -> TaskInfoFetcherResult<HashMap<Pubkey, u64>> {
self.task_info_fetcher
.fetch_current_commit_nonces(pubkeys, min_context_slot)
.await
}
}
8 changes: 7 additions & 1 deletion magicblock-committor-service/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use solana_signature::Signature;
use solana_transaction_error::TransactionError;
use thiserror::Error;

use crate::intent_execution_manager::IntentExecutionManagerError;
use crate::{
intent_execution_manager::IntentExecutionManagerError,
intent_executor::task_info_fetcher::TaskInfoFetcherError,
};

pub type CommittorServiceResult<T, E = CommittorServiceError> = Result<T, E>;

Expand All @@ -26,6 +29,9 @@ pub enum CommittorServiceError {
#[error("IntentExecutionManagerError: {0} ({0:?})")]
IntentExecutionManagerError(#[from] IntentExecutionManagerError),

#[error("TaskInfoFetcherError: {0} ({0:?})")]
TaskInfoFetcherError(#[from] TaskInfoFetcherError),

#[error(
"Failed send and confirm transaction to {0} on chain: {1} ({1:?})"
)]
Expand Down
5 changes: 2 additions & 3 deletions magicblock-committor-service/src/intent_execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ impl<D: DB> IntentExecutionManager<D> {
pub fn new<P: IntentPersister>(
rpc_client: MagicblockRpcClient,
db: D,
task_info_fetcher: Arc<CacheTaskInfoFetcher>,
intent_persister: Option<P>,
table_mania: TableMania,
compute_budget_config: ComputeBudgetConfig,
) -> Self {
let db = Arc::new(db);

let commit_id_tracker =
Arc::new(CacheTaskInfoFetcher::new(rpc_client.clone()));
let executor_factory = IntentExecutorFactoryImpl {
rpc_client,
table_mania,
compute_budget_config,
commit_id_tracker,
task_info_fetcher,
};

let (sender, receiver) = mpsc::channel(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ mod edge_cases_test {

#[cfg(test)]
mod complete_error_test {
use magicblock_program::magic_scheduled_base_intent::CommittedAccount;
use magicblock_core::intent::CommittedAccount;
use solana_account::Account;
use solana_pubkey::pubkey;

Expand Down Expand Up @@ -854,8 +854,9 @@ pub(crate) fn create_test_intent(
pubkeys: &[Pubkey],
is_undelegate: bool,
) -> ScheduledIntentBundle {
use magicblock_core::intent::CommittedAccount;
use magicblock_program::magic_scheduled_base_intent::{
CommitAndUndelegate, CommitType, CommittedAccount, MagicIntentBundle,
CommitAndUndelegate, CommitType, MagicIntentBundle,
ScheduledIntentBundle, UndelegateType,
};
use solana_account::Account;
Expand Down Expand Up @@ -903,8 +904,9 @@ pub(crate) fn create_test_intent_bundle(
commit_pubkeys: &[Pubkey],
commit_and_undelegate_pubkeys: &[Pubkey],
) -> ScheduledIntentBundle {
use magicblock_core::intent::CommittedAccount;
use magicblock_program::magic_scheduled_base_intent::{
CommitAndUndelegate, CommitType, CommittedAccount, MagicIntentBundle,
CommitAndUndelegate, CommitType, MagicIntentBundle,
ScheduledIntentBundle, UndelegateType,
};
use solana_account::Account;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct IntentExecutorFactoryImpl {
pub rpc_client: MagicblockRpcClient,
pub table_mania: TableMania,
pub compute_budget_config: ComputeBudgetConfig,
pub commit_id_tracker: Arc<CacheTaskInfoFetcher>,
pub task_info_fetcher: Arc<CacheTaskInfoFetcher>,
}

impl IntentExecutorFactory for IntentExecutorFactoryImpl {
Expand All @@ -39,7 +39,7 @@ impl IntentExecutorFactory for IntentExecutorFactoryImpl {
IntentExecutorImpl::<TransactionPreparatorImpl, CacheTaskInfoFetcher>::new(
self.rpc_client.clone(),
transaction_preparator,
self.commit_id_tracker.clone(),
self.task_info_fetcher.clone(),
)
}
}
2 changes: 1 addition & 1 deletion magicblock-committor-service/src/intent_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ where
.reset(ResetType::Specific(committed_pubkeys));
let commit_ids = self
.task_info_fetcher
.fetch_next_commit_ids(committed_pubkeys, min_context_slot)
.fetch_next_commit_nonces(committed_pubkeys, min_context_slot)
.await
.map_err(TaskBuilderError::CommitTasksBuildError)?;

Expand Down
Loading