diff --git a/javascript/Cargo.lock b/javascript/Cargo.lock index 2525d50..e63a480 100644 --- a/javascript/Cargo.lock +++ b/javascript/Cargo.lock @@ -1299,7 +1299,7 @@ dependencies = [ [[package]] name = "laserstream-napi" -version = "0.2.9" +version = "0.3.1" dependencies = [ "base64 0.21.7", "bs58", diff --git a/rust/examples/account_sub.rs b/rust/examples/account_sub.rs index 32241f7..410286b 100644 --- a/rust/examples/account_sub.rs +++ b/rust/examples/account_sub.rs @@ -1,5 +1,4 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts}; +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{SubscribeRequest, SubscribeRequestFilterAccounts}}; use futures::StreamExt; use std::env; @@ -32,10 +31,10 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/accounts_data_slice_sub.rs b/rust/examples/accounts_data_slice_sub.rs index 81d7111..136eb43 100644 --- a/rust/examples/accounts_data_slice_sub.rs +++ b/rust/examples/accounts_data_slice_sub.rs @@ -1,7 +1,6 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{ +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{ SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestAccountsDataSlice -}; +}}; use futures::StreamExt; use std::env; @@ -41,10 +40,10 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/basic_usage.rs b/rust/examples/basic_usage.rs index de87e62..780fb33 100644 --- a/rust/examples/basic_usage.rs +++ b/rust/examples/basic_usage.rs @@ -48,10 +48,10 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); } Err(e) => { - eprintln!("Stream error: {}", e); + eprintln!("Stream error: {e}"); } } } diff --git a/rust/examples/block_meta_sub.rs b/rust/examples/block_meta_sub.rs index e578f7f..b9c8db7 100644 --- a/rust/examples/block_meta_sub.rs +++ b/rust/examples/block_meta_sub.rs @@ -1,5 +1,4 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterBlocksMeta}; +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{SubscribeRequest, SubscribeRequestFilterBlocksMeta}}; use futures::StreamExt; use std::env; @@ -29,14 +28,14 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); count += 1; if count >= 10 { break; } } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/block_sub.rs b/rust/examples/block_sub.rs index 64cbf74..0281814 100644 --- a/rust/examples/block_sub.rs +++ b/rust/examples/block_sub.rs @@ -1,5 +1,4 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterBlocks}; +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{SubscribeRequest, SubscribeRequestFilterBlocks}}; use futures::StreamExt; use std::env; @@ -33,10 +32,10 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/channel-options-example.rs b/rust/examples/channel-options-example.rs index a57f005..bf093f4 100644 --- a/rust/examples/channel-options-example.rs +++ b/rust/examples/channel-options-example.rs @@ -63,10 +63,10 @@ async fn main() -> Result<(), Box> { while let Some(update) = stream.next().await { match update { Ok(update) => { - println!("Received update: {:?}", update); + println!("Received update: {update:?}"); } Err(e) => { - eprintln!("Stream error: {:?}", e); + eprintln!("Stream error: {e:?}"); break; } } diff --git a/rust/examples/compression-advanced.rs b/rust/examples/compression-advanced.rs index 2a01e60..f1753ee 100644 --- a/rust/examples/compression-advanced.rs +++ b/rust/examples/compression-advanced.rs @@ -14,21 +14,17 @@ async fn main() -> Result<(), Box> { .expect("Endpoint not set"); // Example 1: Manual compression configuration - let mut channel_options = ChannelOptions::default(); - - // Send using zstd compression - channel_options.send_compression = Some(CompressionEncoding::Zstd); - - // Accept both compression types (order matters - first is preferred) - channel_options.accept_compression = Some(vec![ - CompressionEncoding::Zstd, // Prefer zstd - CompressionEncoding::Gzip, // Fallback to gzip - ]); - - // Other performance options - channel_options.max_decoding_message_size = Some(2_000_000_000); // 2GB - channel_options.http2_keep_alive_interval_secs = Some(20); - channel_options.tcp_nodelay = Some(true); + let channel_options = ChannelOptions { + send_compression: Some(CompressionEncoding::Zstd), + accept_compression: Some(vec![ + CompressionEncoding::Zstd, // Prefer zstd + CompressionEncoding::Gzip, // Fallback to gzip + ]), + max_decoding_message_size: Some(2_000_000_000), // 2GB + http2_keep_alive_interval_secs: Some(20), + tcp_nodelay: Some(true), + ..Default::default() + }; let config = LaserstreamConfig::new(endpoint.clone(), api_key.clone()) .with_channel_options(channel_options); @@ -56,7 +52,7 @@ async fn main() -> Result<(), Box> { } } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } @@ -87,7 +83,7 @@ async fn main() -> Result<(), Box> { } } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/compression-example.rs b/rust/examples/compression-example.rs index de894d4..5f78c26 100644 --- a/rust/examples/compression-example.rs +++ b/rust/examples/compression-example.rs @@ -48,12 +48,12 @@ async fn main() -> Result<(), Box> { } } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } } - println!("Received {} compressed slot updates", count); + println!("Received {count} compressed slot updates"); Ok(()) } \ No newline at end of file diff --git a/rust/examples/entry_sub.rs b/rust/examples/entry_sub.rs index 5e21497..f7f7440 100644 --- a/rust/examples/entry_sub.rs +++ b/rust/examples/entry_sub.rs @@ -1,5 +1,4 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterEntry}; +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{SubscribeRequest, SubscribeRequestFilterEntry}}; use futures::StreamExt; use std::env; @@ -28,10 +27,10 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/preprocessed_transaction_sub.rs b/rust/examples/preprocessed_transaction_sub.rs index 70640e8..60659a3 100644 --- a/rust/examples/preprocessed_transaction_sub.rs +++ b/rust/examples/preprocessed_transaction_sub.rs @@ -44,10 +44,10 @@ async fn main() -> Result<(), Box> { match result { Ok(update) => { // Print the raw debug output - println!("{:#?}", update); + println!("{update:#?}"); } Err(e) => { - eprintln!("Stream error: {:?}", e); + eprintln!("Stream error: {e:?}"); break; } } diff --git a/rust/examples/slot_sub.rs b/rust/examples/slot_sub.rs index 40bd21f..8438403 100644 --- a/rust/examples/slot_sub.rs +++ b/rust/examples/slot_sub.rs @@ -1,5 +1,4 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlots}; +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{SubscribeRequest, SubscribeRequestFilterSlots}}; use futures::StreamExt; use std::env; @@ -35,10 +34,10 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/stream_write_example.rs b/rust/examples/stream_write_example.rs index fa9caa6..e1815a1 100644 --- a/rust/examples/stream_write_example.rs +++ b/rust/examples/stream_write_example.rs @@ -46,7 +46,6 @@ async fn main() -> Result<(), Box> { let mut stream = Box::pin(stream); let message_count = Arc::new(AtomicU32::new(0)); - let handle_clone = handle.clone(); let count_clone = message_count.clone(); // Spawn a task to add subscriptions dynamically @@ -73,8 +72,8 @@ async fn main() -> Result<(), Box> { ..Default::default() }; - if let Err(e) = handle_clone.write(transaction_request).await { - eprintln!("❌ Failed to add transaction subscription: {}", e); + if let Err(e) = handle.write(transaction_request).await { + eprintln!("❌ Failed to add transaction subscription: {e}"); } else { println!("✅ Successfully added transaction subscription"); } @@ -102,8 +101,8 @@ async fn main() -> Result<(), Box> { ..Default::default() }; - if let Err(e) = handle_clone.write(block_request).await { - eprintln!("❌ Failed to add block subscription: {}", e); + if let Err(e) = handle.write(block_request).await { + eprintln!("❌ Failed to add block subscription: {e}"); } else { println!("✅ Successfully added block subscription"); } @@ -138,7 +137,7 @@ async fn main() -> Result<(), Box> { } } Err(e) => { - eprintln!("❌ Stream error: {}", e); + eprintln!("❌ Stream error: {e}"); // The stream will automatically reconnect } } diff --git a/rust/examples/transaction_status_sub.rs b/rust/examples/transaction_status_sub.rs index ff9cb15..1205a28 100644 --- a/rust/examples/transaction_status_sub.rs +++ b/rust/examples/transaction_status_sub.rs @@ -1,5 +1,4 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterTransactions}; +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{SubscribeRequest, SubscribeRequestFilterTransactions}}; use futures::StreamExt; use std::env; @@ -32,14 +31,14 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); count += 1; if count >= 10 { break; } } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/transaction_sub.rs b/rust/examples/transaction_sub.rs index e56a835..b7a6075 100644 --- a/rust/examples/transaction_sub.rs +++ b/rust/examples/transaction_sub.rs @@ -1,5 +1,4 @@ -use helius_laserstream::{subscribe, LaserstreamConfig}; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterTransactions}; +use helius_laserstream::{subscribe, LaserstreamConfig, grpc::{SubscribeRequest, SubscribeRequestFilterTransactions}}; use futures::StreamExt; use std::env; @@ -33,14 +32,14 @@ async fn main() -> Result<(), Box> { while let Some(result) = stream.next().await { match result { Ok(update) => { - println!("{:?}", update); + println!("{update:?}"); count += 1; if count >= 5 { break; } } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } diff --git a/rust/examples/verify_no_internal_filters.rs b/rust/examples/verify_no_internal_filters.rs index f5893a3..9cf360f 100644 --- a/rust/examples/verify_no_internal_filters.rs +++ b/rust/examples/verify_no_internal_filters.rs @@ -36,7 +36,7 @@ async fn main() -> Result<(), Box> { } } Err(e) => { - eprintln!("Error: {:?}", e); + eprintln!("Error: {e:?}"); break; } } @@ -47,7 +47,7 @@ async fn main() -> Result<(), Box> { match timeout { Ok(_) => { if count > 0 { - println!("WARNING: Received {} updates when none were expected!", count); + println!("WARNING: Received {count} updates when none were expected!"); } } Err(_) => { @@ -88,7 +88,7 @@ async fn main() -> Result<(), Box> { } } Some(Err(e)) => { - eprintln!("Stream error: {:?}", e); + eprintln!("Stream error: {e:?}"); break; } None => { @@ -98,7 +98,7 @@ async fn main() -> Result<(), Box> { } } - println!("\n✓ Successfully verified {} slot updates contain no internal filters", verified_count); + println!("\n✓ Successfully verified {verified_count} slot updates contain no internal filters"); Ok(()) } \ No newline at end of file diff --git a/rust/src/client.rs b/rust/src/client.rs index e101d0a..cff41cf 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -4,7 +4,7 @@ use futures::StreamExt; use futures_channel::mpsc as futures_mpsc; use futures_util::{sink::SinkExt, Stream}; use std::{pin::Pin, time::Duration}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tokio::time::sleep; use laserstream_core_proto::tonic::{ Status, Request, metadata::MetadataValue, transport::Endpoint, codec::CompressionEncoding, @@ -34,7 +34,7 @@ impl SdkMetadataInterceptor { fn new(api_key: String) -> Result { let x_token = if !api_key.is_empty() { Some(api_key.parse().map_err(|e| { - Status::invalid_argument(format!("Invalid API key: {}", e)) + Status::invalid_argument(format!("Invalid API key: {e}")) })?) } else { None @@ -59,9 +59,11 @@ impl Interceptor for SdkMetadataInterceptor { } /// Handle for managing a bidirectional streaming subscription. -#[derive(Clone)] +/// +/// Dropping the handle signals the background stream to shut down gracefully. pub struct StreamHandle { write_tx: mpsc::UnboundedSender, + close_tx: Option>, } impl StreamHandle { @@ -73,6 +75,14 @@ impl StreamHandle { } } +impl Drop for StreamHandle { + fn drop(&mut self) { + if let Some(tx) = self.close_tx.take() { + let _ = tx.send(true); + } + } +} + /// Establishes a gRPC connection, handles the subscription lifecycle, /// and provides a stream of updates. Automatically reconnects on failure. #[instrument(skip(config, request))] @@ -84,7 +94,11 @@ pub fn subscribe( StreamHandle, ) { let (write_tx, mut write_rx) = mpsc::unbounded_channel::(); - let handle = StreamHandle { write_tx }; + let (close_tx, mut close_rx) = watch::channel(false); + let handle = StreamHandle { + write_tx, + close_tx: Some(close_tx), + }; let update_stream = stream! { let mut reconnect_attempts = 0; let mut tracked_slot: u64 = 0; @@ -161,6 +175,11 @@ pub fn subscribe( loop { tokio::select! { + // Handle explicit close signal + _ = close_rx.changed() => { + return; + } + // Send periodic ping _ = ping_interval.tick() => { ping_id = ping_id.wrapping_add(1); @@ -255,16 +274,19 @@ pub fn subscribe( error!(attempts = effective_max_attempts, "Max reconnection attempts reached"); // Only report error to consumer after exhausting all retries yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled( - format!("Connection failed after {} attempts", effective_max_attempts) + format!("Connection failed after {effective_max_attempts} attempts") ))); return; } } } - // Wait 5s before retry + // Wait 5s before retry, but abort if close is signalled let delay = Duration::from_millis(FIXED_RECONNECT_INTERVAL_MS); - sleep(delay).await; + tokio::select! { + _ = sleep(delay) => {} + _ = close_rx.changed() => { return; } + } } }; @@ -290,7 +312,7 @@ async fn connect_and_subscribe_once( // Build endpoint with all options let mut endpoint = Endpoint::from_shared(config.endpoint.clone()) - .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))? + .map_err(|e| Status::internal(format!("Failed to parse endpoint: {e}")))? .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10))) .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30))) .http2_keep_alive_interval(Duration::from_secs(options.http2_keep_alive_interval_secs.unwrap_or(30))) @@ -309,13 +331,13 @@ async fn connect_and_subscribe_once( // Configure TLS endpoint = endpoint .tls_config(ClientTlsConfig::new().with_enabled_roots()) - .map_err(|e| Status::internal(format!("TLS config error: {}", e)))?; + .map_err(|e| Status::internal(format!("TLS config error: {e}")))?; // Connect to create channel let channel = endpoint .connect() .await - .map_err(|e| Status::unavailable(format!("Connection failed: {}", e)))?; + .map_err(|e| Status::unavailable(format!("Connection failed: {e}")))?; // Create geyser client with our custom interceptor let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor); @@ -350,12 +372,12 @@ async fn connect_and_subscribe_once( subscribe_tx .send(request) .await - .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to send initial request: {e}")))?; let response = geyser_client .subscribe(subscribe_rx) .await - .map_err(|e| Status::internal(format!("Subscription failed: {}", e)))?; + .map_err(|e| Status::internal(format!("Subscription failed: {e}")))?; Ok((subscribe_tx, response.into_inner())) } @@ -409,7 +431,7 @@ pub fn subscribe_preprocessed( if reconnect_attempts >= effective_max_attempts { error!(attempts = effective_max_attempts, "Max reconnection attempts reached"); yield Err(LaserstreamError::MaxReconnectAttempts(Status::cancelled( - format!("Connection failed after {} attempts", effective_max_attempts) + format!("Connection failed after {effective_max_attempts} attempts") ))); return; } @@ -440,7 +462,7 @@ async fn connect_and_subscribe_preprocessed_once( // Build endpoint with all options let mut endpoint = Endpoint::from_shared(config.endpoint.clone()) - .map_err(|e| Status::internal(format!("Failed to parse endpoint: {}", e)))? + .map_err(|e| Status::internal(format!("Failed to parse endpoint: {e}")))? .connect_timeout(Duration::from_secs(options.connect_timeout_secs.unwrap_or(10))) .timeout(Duration::from_secs(options.timeout_secs.unwrap_or(30))) .tcp_nodelay(options.tcp_nodelay.unwrap_or(true)) @@ -451,12 +473,12 @@ async fn connect_and_subscribe_preprocessed_once( endpoint = endpoint .tls_config(ClientTlsConfig::new().with_enabled_roots()) - .map_err(|e| Status::internal(format!("Failed to configure TLS: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to configure TLS: {e}")))?; let channel = endpoint .connect() .await - .map_err(|e| Status::internal(format!("Failed to connect: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to connect: {e}")))?; let mut geyser_client = GeyserClient::with_interceptor(channel, interceptor) .max_decoding_message_size(options.max_decoding_message_size.unwrap_or(1_000_000_000)) @@ -476,12 +498,12 @@ async fn connect_and_subscribe_preprocessed_once( subscribe_tx .send(request) .await - .map_err(|e| Status::internal(format!("Failed to send initial request: {}", e)))?; + .map_err(|e| Status::internal(format!("Failed to send initial request: {e}")))?; let response = geyser_client .subscribe_preprocessed(subscribe_rx) .await - .map_err(|e| Status::internal(format!("Preprocessed subscription failed: {}", e)))?; + .map_err(|e| Status::internal(format!("Preprocessed subscription failed: {e}")))?; Ok(response.into_inner()) } diff --git a/rust/test/account_integrity.rs b/rust/test/account_integrity.rs index 77a2818..eed5249 100644 --- a/rust/test/account_integrity.rs +++ b/rust/test/account_integrity.rs @@ -11,7 +11,6 @@ use helius_laserstream::{ subscribe, LaserstreamConfig, }; use laserstream_core_client::{ClientTlsConfig, GeyserGrpcClient}; -use bs58; use tracing::{error, warn}; use std::io::{self, Write}; use sha2::{Sha256, Digest}; @@ -20,9 +19,9 @@ use sha2::{Sha256, Digest}; fn fingerprint_account(data: &[u8], lamports: u64) -> String { let mut hasher = Sha256::new(); hasher.update(data); - hasher.update(&lamports.to_le_bytes()); + hasher.update(lamports.to_le_bytes()); let digest = hasher.finalize(); - format!("{:x}", digest) + format!("{digest:x}") } #[tokio::main] @@ -110,7 +109,7 @@ async fn main() -> Result<(), Box> { let fp = fingerprint_account(&account_msg.data, account_msg.lamports); map.insert(key_b58_copy, fp); } - println!("[LS] key={} slot={}", key_b58, slot); + println!("[LS] key={key_b58} slot={slot}"); io::stdout().flush().ok(); } } @@ -165,7 +164,7 @@ async fn main() -> Result<(), Box> { let fp = fingerprint_account(&account_msg.data, account_msg.lamports); map.insert(key_b58_copy, fp); } - println!("[YS] key={} slot={}", key_b58, slot); + println!("[YS] key={key_b58} slot={slot}"); io::stdout().flush().ok(); } } diff --git a/rust/test/block_integrity.rs b/rust/test/block_integrity.rs index 47a1755..1c69d6c 100644 --- a/rust/test/block_integrity.rs +++ b/rust/test/block_integrity.rs @@ -30,7 +30,6 @@ async fn main() -> Result<(), Box> { SubscribeRequestFilterSlots { filter_by_commitment: Some(true), interslot_updates: Some(false), - ..Default::default() }, ); @@ -64,13 +63,13 @@ async fn main() -> Result<(), Box> { if block_exists(&client, rpc_endpoint, &api_key_clone, missing).await? { error!("ERROR: Missed slot {} – block exists but was not received.", missing); } else { - println!("Skipped slot {} (no block produced)", missing); + println!("Skipped slot {missing} (no block produced)"); io::stdout().flush().ok(); } } } } - println!("Received slot: {}", current_slot); + println!("Received slot: {current_slot}"); io::stdout().flush().ok(); last_slot = Some(current_slot); } @@ -100,7 +99,7 @@ async fn block_exists(client: &Client, endpoint: &str, api_key: &str, slot: u64) ] }); - let resp = client.post(format!("{}?api-key={}", endpoint, api_key)) + let resp = client.post(format!("{endpoint}?api-key={api_key}")) .json(&body) .send() .await?; diff --git a/rust/test/subscription_replacement_persistence.rs b/rust/test/subscription_replacement_persistence.rs index 55eca9a..5316509 100644 --- a/rust/test/subscription_replacement_persistence.rs +++ b/rust/test/subscription_replacement_persistence.rs @@ -47,7 +47,7 @@ async fn main() -> Result<(), Box> { .or_else(|_| std::env::var("LASERSTREAM_PRODUCTION_API_KEY")) .unwrap_or_default(); - println!("Connecting through chaos proxy at {}", endpoint); + println!("Connecting through chaos proxy at {endpoint}"); let config = LaserstreamConfig::new(endpoint, api_key); @@ -89,7 +89,6 @@ async fn main() -> Result<(), Box> { let (stream, handle) = subscribe(config, initial_request); let mut stream = Box::pin(stream); - let handle_clone = handle.clone(); let write_time_setter = write_completed_time.clone(); let ready_flag = ready_for_write.clone(); @@ -125,13 +124,13 @@ async fn main() -> Result<(), Box> { ..Default::default() }; - match handle_clone.write(write_request).await { + match handle.write(write_request).await { Ok(()) => { *write_time_setter.lock().await = Some(Instant::now()); println!("[writer] write() sent successfully"); } Err(e) => { - eprintln!("FATAL: Failed to write: {}", e); + eprintln!("FATAL: Failed to write: {e}"); std::process::exit(1); } } @@ -143,7 +142,7 @@ async fn main() -> Result<(), Box> { // Early exit: Phase 3 has enough messages let p3_total = usdc_phase3.load(Ordering::Relaxed) + usdt_phase3.load(Ordering::Relaxed); if p3_total >= MIN_PHASE3_MSGS { - println!("[early exit] Phase 3 collected {} messages", p3_total); + println!("[early exit] Phase 3 collected {p3_total} messages"); break; } @@ -192,9 +191,9 @@ async fn main() -> Result<(), Box> { let n = reconnects_after_write.fetch_add(1, Ordering::SeqCst) + 1; reconnected_after_write.store(true, Ordering::SeqCst); *reconnect_after_write_time.lock().await = Some(Instant::now()); - eprintln!("[reconnect #{}] (after write, #{} post-write) {}", recon_num, n, e); + eprintln!("[reconnect #{recon_num}] (after write, #{n} post-write) {e}"); } else { - eprintln!("[reconnect #{}] (before write) {}", recon_num, e); + eprintln!("[reconnect #{recon_num}] (before write) {e}"); } } } @@ -216,9 +215,9 @@ async fn main() -> Result<(), Box> { let p3_usdt = usdt_phase3.load(Ordering::Relaxed); println!("\n--- Results ---"); - println!("Phase 1 (before write): USDC={:<6} USDT={}", p1_usdc, p1_usdt); - println!("Phase 2 (after write, pre-reconnect): USDC={:<6} USDT={}", p2_usdc, p2_usdt); - println!("Phase 3 (after reconnect post-write): USDC={:<6} USDT={}", p3_usdc, p3_usdt); + println!("Phase 1 (before write): USDC={p1_usdc:<6} USDT={p1_usdt}"); + println!("Phase 2 (after write, pre-reconnect): USDC={p2_usdc:<6} USDT={p2_usdt}"); + println!("Phase 3 (after reconnect post-write): USDC={p3_usdc:<6} USDT={p3_usdt}"); println!("Total reconnections: {} (before write: {}, after write: {})", total_recon, total_recon - post_write_recon, post_write_recon); @@ -226,15 +225,15 @@ async fn main() -> Result<(), Box> { let mut failed = false; if p1_usdc < MIN_PHASE1_MSGS { - eprintln!("FAIL: Phase 1 — USDC got {} txns before write (expected >= {})", p1_usdc, MIN_PHASE1_MSGS); + eprintln!("FAIL: Phase 1 — USDC got {p1_usdc} txns before write (expected >= {MIN_PHASE1_MSGS})"); failed = true; } if p1_usdt > 0 { - eprintln!("FAIL: Phase 1 — USDT got {} txns before write (expected 0)", p1_usdt); + eprintln!("FAIL: Phase 1 — USDT got {p1_usdt} txns before write (expected 0)"); failed = true; } if p2_usdc > 0 { - eprintln!("FAIL: Phase 2 — USDC got {} txns after write (expected 0)", p2_usdc); + eprintln!("FAIL: Phase 2 — USDC got {p2_usdc} txns after write (expected 0)"); failed = true; } if p2_usdt == 0 && !did_reconnect_after_write { @@ -246,11 +245,11 @@ async fn main() -> Result<(), Box> { failed = true; } if p3_usdc > 0 { - eprintln!("FAIL: Phase 3 — USDC got {} txns after reconnect (write NOT persisted!)", p3_usdc); + eprintln!("FAIL: Phase 3 — USDC got {p3_usdc} txns after reconnect (write NOT persisted!)"); failed = true; } if did_reconnect_after_write && p3_usdt < MIN_PHASE3_MSGS { - eprintln!("FAIL: Phase 3 — USDT got {} txns after reconnect (expected >= {})", p3_usdt, MIN_PHASE3_MSGS); + eprintln!("FAIL: Phase 3 — USDT got {p3_usdt} txns after reconnect (expected >= {MIN_PHASE3_MSGS})"); failed = true; } @@ -259,7 +258,7 @@ async fn main() -> Result<(), Box> { std::process::exit(1); } - println!("\nAll assertions passed — write() persisted across {} post-write reconnection(s)", post_write_recon); + println!("\nAll assertions passed — write() persisted across {post_write_recon} post-write reconnection(s)"); Ok(()) } diff --git a/rust/test/transaction_integrity.rs b/rust/test/transaction_integrity.rs index 4ba26aa..172a430 100644 --- a/rust/test/transaction_integrity.rs +++ b/rust/test/transaction_integrity.rs @@ -10,7 +10,6 @@ use helius_laserstream::{ use laserstream_core_client::{ClientTlsConfig, GeyserGrpcClient}; use tokio_stream::StreamExt; use tracing::{error, info, warn}; -use bs58; use std::io::{self, Write}; #[tokio::main] @@ -56,7 +55,8 @@ async fn main() -> Result<(), Box> { let max_slot_ys = Arc::new(Mutex::new(0u64)); // For display and match when both slots known - let status_map: Arc, Option)>>> = Arc::new(Mutex::new(HashMap::new())); + type StatusMap = HashMap, Option)>; + let status_map: Arc> = Arc::new(Mutex::new(HashMap::new())); // Counters for periodic report let new_ls = Arc::new(Mutex::new(0u64)); @@ -102,7 +102,7 @@ async fn main() -> Result<(), Box> { if entry.0.is_some() && entry.1.is_some() { info!("MATCH {} LS_slot={} YS_slot={}", sig_str, slot, entry.1.unwrap()); st.remove(&sig_str); - println!("[LS] sig={} slot={}", sig_str, slot); + println!("[LS] sig={sig_str} slot={slot}"); io::stdout().flush().ok(); } } @@ -165,7 +165,7 @@ async fn main() -> Result<(), Box> { if entry.0.is_some() && entry.1.is_some() { info!("MATCH {} LS_slot={} YS_slot={}", sig_str, entry.0.unwrap(), slot); st.remove(&sig_str); - println!("[YS] sig={} slot={}", sig_str, slot); + println!("[YS] sig={sig_str} slot={slot}"); io::stdout().flush().ok(); } } @@ -236,7 +236,7 @@ async fn main() -> Result<(), Box> { let err_l = *err_ls.lock().await; let err_y = *err_ys.lock().await; - println!("[{}] laserstream+{} yellowstone+{} processedSlots:{} missingLS:{} missingYS:{} LS_errors:{} YS_errors:{}", now, ls_new, ys_new, processed_slots, total_missing_ls, total_missing_ys, err_l, err_y); + println!("[{now}] laserstream+{ls_new} yellowstone+{ys_new} processedSlots:{processed_slots} missingLS:{total_missing_ls} missingYS:{total_missing_ys} LS_errors:{err_l} YS_errors:{err_y}"); *new_ls.lock().await = 0; *new_ys.lock().await = 0;