diff --git a/.gitignore b/.gitignore index ad0f686..2c4127a 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,6 @@ target/* payer*.json -combined.pem \ No newline at end of file +combined.pem + +.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 73d26d3..55f5a94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,6 +18,7 @@ version = "0.1.0" dependencies = [ "adrena-abi", "anchor-client", + "anchor-lang", "anchor-spl", "anyhow", "backoff", @@ -36,6 +37,7 @@ dependencies = [ "openssl", "postgres-openssl", "rust_decimal", + "serde", "serde_json", "sha2 0.10.8", "solana-account-decoder", diff --git a/Cargo.toml b/Cargo.toml index b17c419..4b3f5a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ hex = "0.4.3" log = "0.4.17" maplit = "1.0.2" rust_decimal = { version = "1.32", features = ["tokio-pg"] } +serde = "1" serde_json = "1.0.86" solana-sdk = "2.0.18" solana-transaction-status = "2.0.18" @@ -41,6 +42,7 @@ adrena-abi = { git = "https://github.com/AdrenaFoundation/adrena-abi.git", rev = anchor-client = { git = "https://github.com/coral-xyz/anchor.git", rev = "04536725c2ea16329e84bcfe3200afd47eeeb464", features = [ "async", ] } +anchor-lang = { git = "https://github.com/coral-xyz/anchor.git", rev = "04536725c2ea16329e84bcfe3200afd47eeeb464" } anchor-spl = { git = "https://github.com/coral-xyz/anchor.git", rev = "04536725c2ea16329e84bcfe3200afd47eeeb464" } spl-associated-token-account = { version = "6.0.0", features = [ "no-entrypoint", diff --git a/README.md b/README.md index 9b4a6b8..b2f6ae3 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,168 @@ # MrOracle -OpenSource rust client (Keeper) handling oracle and alp price updates onchain. +OpenSource Rust keeper handling oracle and ALP price updates on-chain for the Adrena protocol. + +MrOracle reads the latest ChaosLabs oracle prices from a PostgreSQL database, formats them into `ChaosLabsBatchPrices`, and pushes them on-chain via the `updatePoolAum` instruction every 5 seconds (longer during RPC fallback). A multi-layer RPC fallback chain (primary → backup → public) ensures price updates continue even if primary endpoints go down. + +## Table of Contents + +- [Architecture](#architecture) +- [Build](#build) +- [Configuration](#configuration) +- [RPC Fallback Architecture](#rpc-fallback-architecture) +- [Running](#running) +- [Troubleshooting](#troubleshooting) + +--- + +## Architecture + +``` +ChaosLabs → adrena-data cron → PostgreSQL + │ + │ every 5s + ▼ + +------------+ + | MrOracle | + +-----+------+ + │ + │ updatePoolAum TX + ▼ + Primary RPC → Backup RPC → Public RPC + │ + ▼ + Solana Blockchain +``` + +Each cycle: +1. Fetch latest prices from PostgreSQL (`assets_price` table) +2. Format into ChaosLabs batch format +3. Build `updatePoolAum` instruction +4. Sign + send + confirm on primary RPC (falls through to backup → public on failure) + +--- ## Build -`$> cargo build` -`$> cargo build --release` +```bash +cargo build # debug +cargo build --release # production +``` + +The binary is at `target/release/mroracle`. + +--- + +## Configuration + +### CLI Arguments + +| Flag | Required | Default | Description | +|------|----------|---------|-------------| +| `--rpc` | No | `http://127.0.0.1:10000` | Primary Solana JSON-RPC endpoint (with auth key in URL path) | +| `--rpc-backup` | No | — | Backup RPC endpoint (used when primary fails) | +| `--rpc-public` | No | `https://api.mainnet-beta.solana.com` | Last-resort public RPC | +| `--payer-keypair` | Yes | — | Path to funded payer keypair JSON | +| `--db-string` | Yes | — | PostgreSQL connection string for price DB | +| `--combined-cert` | Yes | — | Path to combined certificate file (for DB TLS) | +| `--commitment` | No | `processed` | Solana commitment level | + +### Log Levels + +Control via `RUST_LOG` environment variable: + +```bash +RUST_LOG=info ./target/release/mroracle ... +RUST_LOG=debug ./target/release/mroracle ... +``` + +--- + +## RPC Fallback Architecture + +Every JSON-RPC call in MrOracle goes through a 3-layer fallback chain: + +``` +primary (--rpc) → backup (--rpc-backup) → public (--rpc-public) +``` + +Each layer has a 5-second timeout. On failure, the next layer is tried. Per endpoint: get blockhash → sign → send → confirm (4 polls @ 500ms, up to 2s). On-chain errors short-circuit the chain (no retry on backup/public). + +**What's covered by fallback:** +- Pool account fetch at startup +- Priority fee polling (every 5s) +- Oracle price update transaction signing + sending + confirmation + +**RPC fallback is stateless** — each RPC call starts fresh from primary. If primary recovers, the very next call hits it first. No circuit breaker, no stickiness. + +**Log prefixes:** `[Pool Fetch]`, `[Priority Fees]`, `[Price Update]`. Failures show as: + +``` +ERROR [Price Update] PRIMARY RPC failed: +ERROR [Price Update] BACKUP RPC failed: +WARN [Price Update] TX confirmed via PUBLIC RPC fallback: +``` + +On total failure: +``` +ERROR [Price Update] All RPCs failed. Please handle ASAP. Critical Priority. +``` + +--- + +## Running + +### Local + +```bash +./target/release/mroracle \ + --rpc "https://your-primary-rpc.com/" \ + --rpc-backup "https://your-backup-rpc.com/" \ + --rpc-public "https://api.mainnet-beta.solana.com" \ + --payer-keypair payer.json \ + --commitment processed \ + --db-string "postgresql://user:pass@host/db" \ + --combined-cert /path/to/combined.pem +``` + +### On Render + +```bash +./target/release/mroracle \ + --payer-keypair /etc/secrets/mr_oracle.json \ + --rpc "https://primary-rpc.example.com/" \ + --rpc-backup "https://backup-rpc.example.com/" \ + --rpc-public "https://api.mainnet-beta.solana.com" \ + --commitment processed \ + --db-string "postgresql://adrena:@.singapore-postgres.render.com/transaction_db_celf" \ + --combined-cert /etc/secrets/combined.pem +``` + +Without `--rpc-backup`: the fallback chain is primary → public. With it: primary → backup → public. + +--- + +## Troubleshooting + +### All RPCs failing / "Critical Priority" log + +``` +ERROR [Price Update] All RPCs failed. Please handle ASAP. Critical Priority. +``` + +All 3 RPC layers failed. Check the individual layer errors above the summary line. Common causes: +- All three URLs misconfigured +- Network outage on your side +- Solana mainnet congestion causing public RPC to throttle (100 req/10s limit) + +### Pool fetch crashes at startup + +If `[Pool Fetch] All RPCs failed` fires at startup, the service can't boot. Primary, backup, and public RPCs all failed to return the pool account. Verify URLs and auth keys. -## Run +### Priority Fees stale -`$> cargo run -- --payer-keypair payer.json --endpoint https://adrena.rpcpool.com/xxx --commitment finalized --db-string "postgresql://adrena:YYY.singapore-postgres.render.com/transaction_db_celf" --combined-cert /etc/secrets/combined.pem` +If priority fees can't be fetched (e.g., public RPC rate-limited), MrOracle keeps the last known value (0 at startup if never fetched) and continues. Transactions still land but may be slow to confirm during congestion. -Or on Render +### DB connection errors -`./target/release/mroracle --payer-keypair /etc/secrets/mroracle.json --endpoint https://adrena.rpcpool.com/xxx--x-token xxx --commitment finalized --db-string "postgresql://adrena:YYY.singapore-postgres.render.com/transaction_db_celf" --combined-cert /etc/secrets/combined.pem` -Ideally run that on a Render instance. +MrOracle depends on adrena-data writing prices to the PostgreSQL `assets_price` table. If the DB is unreachable, MrOracle retries with exponential backoff (50ms → 100ms → 200ms, 3 attempts) before logging an error and moving to the next cycle. diff --git a/src/client.rs b/src/client.rs index 681bee4..aefbd52 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,20 +1,25 @@ use { adrena_abi::{self, Pool}, - anchor_client::{solana_sdk::signer::keypair::read_keypair_file, Client, Cluster}, clap::Parser, openssl::ssl::{SslConnector, SslMethod}, postgres_openssl::MakeTlsConnector, priority_fees::fetch_mean_priority_fee, - solana_sdk::{instruction::AccountMeta, pubkey::Pubkey}, + solana_sdk::{ + instruction::AccountMeta, + pubkey::Pubkey, + signer::keypair::read_keypair_file, + }, std::{env, sync::Arc, thread::sleep, time::Duration}, - tokio::{sync::Mutex, task::JoinHandle, time::interval, time::Instant}, + tokio::{sync::Mutex, time::interval, time::Instant}, }; pub mod db; pub mod handlers; pub mod priority_fees; +pub mod rpc_fallback; pub mod utils; +use rpc_fallback::RpcFallback; use utils::format_chaos_labs_oracle_entry_to_params::format_chaos_labs_oracle_entry_to_params; const DEFAULT_ENDPOINT: &str = "http://127.0.0.1:10000"; @@ -30,15 +35,25 @@ enum ArgsCommitment { Finalized, } +impl From for solana_sdk::commitment_config::CommitmentConfig { + fn from(c: ArgsCommitment) -> Self { + use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; + CommitmentConfig { + commitment: match c { + ArgsCommitment::Processed => CommitmentLevel::Processed, + ArgsCommitment::Confirmed => CommitmentLevel::Confirmed, + ArgsCommitment::Finalized => CommitmentLevel::Finalized, + }, + } + } +} + #[derive(Debug, Clone, Parser)] #[clap(author, version, about)] struct Args { - #[clap(short, long, default_value_t = String::from(DEFAULT_ENDPOINT))] - /// Service endpoint - endpoint: String, - - #[clap(long)] - x_token: Option, + #[clap(long, default_value_t = String::from(DEFAULT_ENDPOINT))] + /// Primary Solana JSON-RPC endpoint + rpc: String, /// Commitment level: processed, confirmed or finalized #[clap(long)] @@ -55,6 +70,14 @@ struct Args { /// Combined certificate #[clap(long)] combined_cert: String, + + /// Backup RPC endpoint URL (optional, used as fallback) + #[clap(long)] + rpc_backup: Option, + + /// Public RPC endpoint URL (last-resort fallback) + #[clap(long, default_value_t = String::from(rpc_fallback::DEFAULT_PUBLIC_RPC))] + rpc_public: String, } #[tokio::main] @@ -67,25 +90,19 @@ async fn main() -> Result<(), anyhow::Error> { let args = Args::parse(); - let args = args.clone(); - let mut periodical_priority_fees_fetching_task: Option>> = - None; - - // In case it errored out, abort the fee task (will be recreated) - if let Some(t) = periodical_priority_fees_fetching_task.take() { - t.abort(); - } + let commitment: solana_sdk::commitment_config::CommitmentConfig = + args.commitment.unwrap_or_default().into(); let payer = read_keypair_file(args.payer_keypair.clone()).unwrap(); let payer = Arc::new(payer); - let client = Client::new( - Cluster::Custom(args.endpoint.clone(), args.endpoint.clone()), - Arc::clone(&payer), - ); - let program = client - .program(adrena_abi::ID) - .map_err(|e| anyhow::anyhow!("Failed to get program: {:?}", e))?; + let rpc_fallback = Arc::new(RpcFallback::new( + &args.rpc, + args.rpc_backup.as_deref(), + &args.rpc_public, + commitment, + Arc::clone(&payer), + )); // //////////////////////////////////////////////////////////////// // DB CONNECTION POOL @@ -119,33 +136,31 @@ async fn main() -> Result<(), anyhow::Error> { let median_priority_fee = Arc::new(Mutex::new(0u64)); // Spawn a task to poll priority fees every 5 seconds log::info!("3 - Spawn a task to poll priority fees every 5 seconds..."); - #[allow(unused_assignments)] { - periodical_priority_fees_fetching_task = Some({ - let median_priority_fee = Arc::clone(&median_priority_fee); - tokio::spawn(async move { - let mut fee_refresh_interval = interval(PRIORITY_FEE_REFRESH_INTERVAL); - loop { - fee_refresh_interval.tick().await; - if let Ok(fee) = - fetch_mean_priority_fee(&client, MEAN_PRIORITY_FEE_PERCENTILE).await - { - let mut fee_lock = median_priority_fee.lock().await; - *fee_lock = fee; - log::debug!( - " <> Updated median priority fee 30th percentile to : {} µLamports / cu", - fee - ); - } + let median_priority_fee = Arc::clone(&median_priority_fee); + let rpc_fallback_clone = Arc::clone(&rpc_fallback); + tokio::spawn(async move { + let mut fee_refresh_interval = interval(PRIORITY_FEE_REFRESH_INTERVAL); + loop { + fee_refresh_interval.tick().await; + if let Ok(fee) = + fetch_mean_priority_fee(&rpc_fallback_clone, MEAN_PRIORITY_FEE_PERCENTILE).await + { + let mut fee_lock = median_priority_fee.lock().await; + *fee_lock = fee; + log::debug!( + " <> Updated mean priority fee to: {} µLamports / cu", + fee + ); } - }) + } }); } - let pool = program - .account::(adrena_abi::MAIN_POOL_ID) + let pool = rpc_fallback + .get_account::(&adrena_abi::MAIN_POOL_ID, "Pool Fetch") .await - .map_err(|e| anyhow::anyhow!("Failed to get pool from DB: {:?}", e))?; + .map_err(|e| anyhow::anyhow!("Failed to get pool: {:?}", e))?; let mut custodies_accounts: Vec = vec![]; for key in pool.custodies.iter() { @@ -173,10 +188,14 @@ async fn main() -> Result<(), anyhow::Error> { match last_trading_prices { Ok(last_trading_prices) => { + // Copy the fee out of the lock BEFORE awaiting the TX send, + // otherwise the MutexGuard is held across the await and blocks + // the priority-fee refresher task for the duration of the send. + let fee = *median_priority_fee.lock().await; // ignore errors on call since we want to keep executing IX let _ = handlers::update_pool_aum( - &program, - *median_priority_fee.lock().await, + &rpc_fallback, + fee, last_trading_prices, remaining_accounts.clone(), ) diff --git a/src/handlers/update_pool_aum.rs b/src/handlers/update_pool_aum.rs index 5222758..c8ff943 100644 --- a/src/handlers/update_pool_aum.rs +++ b/src/handlers/update_pool_aum.rs @@ -1,68 +1,49 @@ use { - crate::{handlers::create_update_pool_aum_ix, UPDATE_AUM_CU_LIMIT}, + crate::{handlers::create_update_pool_aum_ix, rpc_fallback::RpcFallback, UPDATE_AUM_CU_LIMIT}, adrena_abi::oracle::ChaosLabsBatchPrices, - anchor_client::Program, + anchor_lang::{InstructionData, ToAccountMetas}, solana_client::rpc_config::RpcSendTransactionConfig, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, instruction::AccountMeta, signature::Keypair, + compute_budget::ComputeBudgetInstruction, + instruction::{AccountMeta, Instruction}, }, - std::{sync::Arc, time::Duration}, - tokio::time::timeout, }; pub async fn update_pool_aum( - program: &Program>, + rpc_fallback: &RpcFallback, median_priority_fee: u64, last_trading_prices: ChaosLabsBatchPrices, remaining_accounts: Vec, ) -> Result<(), anyhow::Error> { log::info!(" <*> Updating AUM"); - let (update_pool_aum_params, update_pool_aum_accounts) = - create_update_pool_aum_ix(&program.payer(), Some(last_trading_prices)); + let (params, accounts) = + create_update_pool_aum_ix(&rpc_fallback.payer_pubkey(), Some(last_trading_prices)); - let tx = timeout( - Duration::from_secs(2), // 2 second timeout for handling getBlockHash hanging - program - .request() - .instruction(ComputeBudgetInstruction::set_compute_unit_price( - median_priority_fee, - )) - .instruction(ComputeBudgetInstruction::set_compute_unit_limit( - UPDATE_AUM_CU_LIMIT, - )) - .args(update_pool_aum_params) - .accounts(update_pool_aum_accounts) - // Remaining accounts - .accounts(remaining_accounts) - .signed_transaction(), - ) - .await - .map_err(|_| { - log::error!(" <> Transaction generation timed out after 10 seconds"); - anyhow::anyhow!("Transaction generation timed out") - })? - .map_err(|e| { - log::error!(" <> Transaction generation failed with error: {:?}", e); - anyhow::anyhow!("Transaction generation failed with error: {:?}", e) - })?; + let mut account_metas = accounts.to_account_metas(None); + account_metas.extend(remaining_accounts); - let rpc_client = program.rpc(); + let instructions = vec![ + ComputeBudgetInstruction::set_compute_unit_price(median_priority_fee), + ComputeBudgetInstruction::set_compute_unit_limit(UPDATE_AUM_CU_LIMIT), + Instruction { + program_id: adrena_abi::ID, + accounts: account_metas, + data: InstructionData::data(¶ms), + }, + ]; - let tx_hash = rpc_client - .send_transaction_with_config( - &tx, + let tx_hash = rpc_fallback + .sign_and_send( + instructions, RpcSendTransactionConfig { skip_preflight: true, max_retries: Some(0), ..Default::default() }, + "Price Update", ) - .await - .map_err(|e| { - log::error!(" <> Transaction sending failed with error: {:?}", e); - anyhow::anyhow!("Transaction sending failed with error: {:?}", e) - })?; + .await?; log::info!(" <> TX sent: {:#?}", tx_hash.to_string()); diff --git a/src/priority_fees.rs b/src/priority_fees.rs index e1e1dc2..79d571c 100644 --- a/src/priority_fees.rs +++ b/src/priority_fees.rs @@ -1,10 +1,12 @@ use { - adrena_abi::ADRENA_PROGRAM_ID, - anchor_client::Client, + crate::rpc_fallback::RpcFallback, serde_json, - solana_client::rpc_response::RpcPrioritizationFee, - solana_sdk::{pubkey::Pubkey, signature::Keypair}, - std::{error::Error, sync::Arc}, + solana_client::{ + rpc_request::RpcRequest, + rpc_response::RpcPrioritizationFee, + }, + solana_sdk::pubkey::Pubkey, + std::error::Error, }; pub struct GetRecentPrioritizationFeesByPercentileConfig { @@ -14,7 +16,7 @@ pub struct GetRecentPrioritizationFeesByPercentileConfig { } pub async fn fetch_mean_priority_fee( - client: &Client>, + rpc_fallback: &RpcFallback, percentile: u64, ) -> Result { let config = GetRecentPrioritizationFeesByPercentileConfig { @@ -22,13 +24,13 @@ pub async fn fetch_mean_priority_fee( fallback: false, locked_writable_accounts: vec![], //adrena_abi::MAIN_POOL_ID, adrena_abi::CORTEX_ID], }; - get_mean_prioritization_fee_by_percentile(client, &config, None) + get_mean_prioritization_fee_by_percentile(rpc_fallback, &config, None) .await .map_err(|e| anyhow::anyhow!("Failed to fetch mean priority fee: {:?}", e)) } pub async fn get_recent_prioritization_fees_by_percentile( - client: &Client>, + rpc_fallback: &RpcFallback, config: &GetRecentPrioritizationFeesByPercentileConfig, slots_to_return: Option, ) -> Result, Box> { @@ -37,18 +39,15 @@ pub async fn get_recent_prioritization_fees_by_percentile( .iter() .map(|key| key.to_string()) .collect(); - let mut args = vec![serde_json::to_value(accounts)?]; + // Only send the accounts array — the percentile param is a Triton/rpcpool + // extension and not supported by standard RPCs (e.g. api.mainnet-beta.solana.com) + let args = vec![serde_json::to_value(accounts)?]; - if let Some(percentile) = config.percentile { - args.push(serde_json::to_value(vec![percentile])?); - } - - let response: Vec = client - .program(ADRENA_PROGRAM_ID)? - .rpc() - .send( - solana_client::rpc_request::RpcRequest::GetRecentPrioritizationFees, + let response: Vec = rpc_fallback + .send_rpc_request( + RpcRequest::GetRecentPrioritizationFees, serde_json::Value::from(args), + "Priority Fees", ) .await?; @@ -64,24 +63,28 @@ pub async fn get_recent_prioritization_fees_by_percentile( } pub async fn get_mean_prioritization_fee_by_percentile( - client: &Client>, + rpc_fallback: &RpcFallback, config: &GetRecentPrioritizationFeesByPercentileConfig, slots_to_return: Option, ) -> Result> { let recent_prioritization_fees = - get_recent_prioritization_fees_by_percentile(client, config, slots_to_return).await?; + get_recent_prioritization_fees_by_percentile(rpc_fallback, config, slots_to_return).await?; if recent_prioritization_fees.is_empty() { return Err("No prioritization fees retrieved".into()); } - let sum: u64 = recent_prioritization_fees + // Client-side percentile computation (the server-side percentile param is a + // Triton-only extension that doesn't work on public RPCs). We take the fee + // value at the given percentile (basis points, 10000 = 100%) from a sorted + // ascending list. Higher percentile = higher fee = higher priority. + let mut fees: Vec = recent_prioritization_fees .iter() .map(|fee| fee.prioritization_fee) - .sum(); - - let mean = (sum + recent_prioritization_fees.len() as u64 - 1) - / recent_prioritization_fees.len() as u64; + .collect(); + fees.sort_unstable(); - Ok(mean) + let percentile_bps = config.percentile.unwrap_or(5000); + let idx = ((fees.len() as u64).saturating_sub(1) * percentile_bps / 10_000) as usize; + Ok(fees[idx.min(fees.len() - 1)]) } diff --git a/src/rpc_fallback.rs b/src/rpc_fallback.rs new file mode 100644 index 0000000..a56fafe --- /dev/null +++ b/src/rpc_fallback.rs @@ -0,0 +1,264 @@ +use { + anchor_lang::AccountDeserialize, + serde::de::DeserializeOwned, + solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_config::RpcSendTransactionConfig, + rpc_request::RpcRequest, + }, + solana_sdk::{ + commitment_config::CommitmentConfig, + instruction::Instruction, + pubkey::Pubkey, + signature::{Keypair, Signature}, + signer::Signer, + transaction::Transaction, + }, + std::{sync::Arc, time::Duration}, + tokio::time::timeout, +}; + +const RPC_TIMEOUT: Duration = Duration::from_secs(5); +const CONFIRM_POLLS: u32 = 4; +const CONFIRM_INTERVAL: Duration = Duration::from_millis(500); + +pub const DEFAULT_PUBLIC_RPC: &str = "https://api.mainnet-beta.solana.com"; + +struct RpcEndpoint { + label: &'static str, + client: RpcClient, +} + +pub struct RpcFallback { + endpoints: Vec, + payer: Arc, +} + +impl RpcFallback { + pub fn new( + primary_url: &str, + backup_url: Option<&str>, + public_url: &str, + commitment: CommitmentConfig, + payer: Arc, + ) -> Self { + let make_client = |url: &str| { + RpcClient::new_with_timeout_and_commitment(url.to_string(), RPC_TIMEOUT, commitment) + }; + + let mut endpoints = vec![RpcEndpoint { + label: "primary", + client: make_client(primary_url), + }]; + + if let Some(url) = backup_url { + endpoints.push(RpcEndpoint { + label: "backup", + client: make_client(url), + }); + } + + endpoints.push(RpcEndpoint { + label: "public", + client: make_client(public_url), + }); + + Self { endpoints, payer } + } + + pub fn payer_pubkey(&self) -> solana_sdk::pubkey::Pubkey { + self.payer.pubkey() + } + + /// Build, sign, send, and confirm a transaction with RPC fallback. + /// Tries each RPC in order (primary -> backup -> public). + /// Per endpoint: get blockhash -> sign -> send -> confirm. + /// Only moves to the next endpoint if send or confirm fails. + pub async fn sign_and_send( + &self, + instructions: Vec, + config: RpcSendTransactionConfig, + operation: &str, + ) -> Result { + for endpoint in &self.endpoints { + let label_upper = endpoint.label.to_uppercase(); + + // 1. Get blockhash + let blockhash = match timeout(RPC_TIMEOUT, endpoint.client.get_latest_blockhash()).await + { + Ok(Ok(h)) => h, + Ok(Err(e)) => { + log::error!("[{}] {} RPC failed: {}", operation, label_upper, e); + continue; + } + Err(_) => { + log::error!("[{}] {} RPC failed: timed out", operation, label_upper); + continue; + } + }; + + // 2. Sign + let tx = Transaction::new_signed_with_payer( + &instructions, + Some(&self.payer.pubkey()), + &[self.payer.as_ref()], + blockhash, + ); + + // 3. Send + let sig = match timeout( + RPC_TIMEOUT, + endpoint.client.send_transaction_with_config(&tx, config.clone()), + ) + .await + { + Ok(Ok(sig)) => sig, + Ok(Err(e)) => { + log::error!("[{}] {} RPC failed: {}", operation, label_upper, e); + continue; + } + Err(_) => { + log::error!("[{}] {} RPC failed: timed out", operation, label_upper); + continue; + } + }; + + // 4. Confirm — poll for on-chain status (each poll wrapped in timeout for predictable worst-case timing) + let mut confirmed = false; + for _ in 0..CONFIRM_POLLS { + tokio::time::sleep(CONFIRM_INTERVAL).await; + let status_result = timeout( + RPC_TIMEOUT, + endpoint.client.get_signature_statuses(&[sig]), + ) + .await; + match status_result { + Ok(Ok(response)) => { + if let Some(Some(status)) = response.value.first() { + if let Some(err) = &status.err { + // TX landed but failed on-chain. The error is deterministic + // — retrying on backup/public RPCs will hit the same error + // and just burn more priority fees. Return immediately. + log::error!( + "[{}] {} RPC tx failed on-chain: {:?}", + operation, label_upper, err + ); + return Err(anyhow::anyhow!( + "[{}] tx {} failed on-chain: {:?}", + operation, sig, err + )); + } + confirmed = true; + break; + } + // None = not yet visible, keep polling + } + Ok(Err(_)) => { + // RPC error, keep polling + } + Err(_) => { + // Timed out, keep polling + } + } + } + + if confirmed { + if endpoint.label != "primary" { + log::warn!( + "[{}] TX confirmed via {} RPC fallback: {}", + operation, label_upper, sig + ); + } + return Ok(sig); + } + + log::error!( + "[{}] {} RPC tx not confirmed after {}ms", + operation, label_upper, CONFIRM_POLLS * CONFIRM_INTERVAL.as_millis() as u32 + ); + } + + log::error!("[{}] All RPCs failed. Please handle ASAP. Critical Priority.", operation); + + Err(anyhow::anyhow!( + "[{}] All RPCs failed. Please handle ASAP. Critical Priority.", + operation + )) + } + + /// Fetch and deserialize a typed account with RPC fallback. + /// Tries each RPC in order. Deserialization errors are NOT retried + /// (data is the same on all RPCs). + pub async fn get_account( + &self, + pubkey: &Pubkey, + operation: &str, + ) -> Result { + for endpoint in &self.endpoints { + let label_upper = endpoint.label.to_uppercase(); + + let account = match timeout(RPC_TIMEOUT, endpoint.client.get_account(pubkey)).await { + Ok(Ok(account)) => account, + Ok(Err(e)) => { + log::error!("[{}] {} RPC failed: {}", operation, label_upper, e); + continue; + } + Err(_) => { + log::error!("[{}] {} RPC failed: timed out", operation, label_upper); + continue; + } + }; + + let mut data: &[u8] = &account.data; + return T::try_deserialize(&mut data) + .map_err(|e| anyhow::anyhow!("[{}] deserialization failed: {}", operation, e)); + } + + log::error!( + "[{}] All RPCs failed. Please handle ASAP. Critical Priority.", + operation + ); + Err(anyhow::anyhow!( + "[{}] All RPCs failed. Please handle ASAP. Critical Priority.", + operation + )) + } + + /// Send a raw RPC request with fallback. + pub async fn send_rpc_request( + &self, + request: RpcRequest, + params: serde_json::Value, + operation: &str, + ) -> Result { + for endpoint in &self.endpoints { + let label_upper = endpoint.label.to_uppercase(); + + match timeout( + RPC_TIMEOUT, + endpoint.client.send::(request, params.clone()), + ) + .await + { + Ok(Ok(result)) => return Ok(result), + Ok(Err(e)) => { + log::error!("[{}] {} RPC failed: {}", operation, label_upper, e); + continue; + } + Err(_) => { + log::error!("[{}] {} RPC failed: timed out", operation, label_upper); + continue; + } + } + } + + log::error!( + "[{}] All RPCs failed. Please handle ASAP. Critical Priority.", + operation + ); + Err(anyhow::anyhow!( + "[{}] All RPCs failed. Please handle ASAP. Critical Priority.", + operation + )) + } +}