diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ed7c7bb5..851392d3de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2310](https://github.com/FuelLabs/fuel-core/pull/2310): The `metrics` command-line parameter has been replaced with `disable-metrics`. Metrics are now enabled by default, with the option to disable them entirely or on a per-module basis. - [2341](https://github.com/FuelLabs/fuel-core/pull/2341): The maximum number of processed coins from the `coins_to_spend` query is limited to `max_inputs`. +### Fixed + +- [2352](https://github.com/FuelLabs/fuel-core/pull/2352): Cache p2p responses to serve without roundtrip to db. + ## [Version 0.39.0] ### Added diff --git a/Cargo.lock b/Cargo.lock index 67a97f72c3..36f2d1f96e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2214,6 +2214,20 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3556,6 +3570,7 @@ dependencies = [ "anyhow", "async-trait", "ctor", + "dashmap", "fuel-core-chain-config", "fuel-core-metrics", "fuel-core-p2p", diff --git a/crates/metrics/src/p2p_metrics.rs b/crates/metrics/src/p2p_metrics.rs index a139cfb6ee..b535bcccb4 100644 --- a/crates/metrics/src/p2p_metrics.rs +++ b/crates/metrics/src/p2p_metrics.rs @@ -8,16 +8,22 @@ use std::sync::OnceLock; pub struct P2PMetrics { pub unique_peers: Counter, pub blocks_requested: Gauge, + pub p2p_req_res_cache_hits: Counter, + pub p2p_req_res_cache_misses: Counter, } impl P2PMetrics { fn new() -> Self { let unique_peers = Counter::default(); let blocks_requested = Gauge::default(); + let p2p_req_res_cache_hits = Counter::default(); + let p2p_req_res_cache_misses = Counter::default(); let metrics = P2PMetrics { unique_peers, blocks_requested, + p2p_req_res_cache_hits, + p2p_req_res_cache_misses, }; let mut registry = global_registry().registry.lock(); @@ -33,6 +39,18 @@ impl P2PMetrics { metrics.blocks_requested.clone() ); + registry.register( + "P2p_Req_Res_Cache_Hits", + "A Counter which keeps track of the number of cache hits for the p2p req/res protocol", + metrics.p2p_req_res_cache_hits.clone() + ); + + registry.register( + "P2p_Req_Res_Cache_Misses", + "A Counter which keeps track of the number of cache misses for the p2p req/res protocol", + metrics.p2p_req_res_cache_misses.clone() + ); + metrics } } @@ -50,3 +68,11 @@ pub fn increment_unique_peers() { pub fn set_blocks_requested(count: usize) { p2p_metrics().blocks_requested.set(count as i64); } + +pub fn increment_p2p_req_res_cache_hits() { + p2p_metrics().p2p_req_res_cache_hits.inc(); +} + +pub fn increment_p2p_req_res_cache_misses() { + p2p_metrics().p2p_req_res_cache_misses.inc(); +} diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 09662d2631..aeab90bc95 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -13,6 +13,7 @@ description = "Fuel client networking" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +dashmap = "6.1.0" fuel-core-chain-config = { workspace = true } fuel-core-metrics = { workspace = true } # TODO make this a feature fuel-core-services = { workspace = true, features = ["sync-processor"] } diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs new file mode 100644 index 0000000000..809b9f3314 --- /dev/null +++ b/crates/services/p2p/src/cached_view.rs @@ -0,0 +1,322 @@ +use crate::ports::P2pDb; +use dashmap::DashMap; +use fuel_core_metrics::p2p_metrics::{ + increment_p2p_req_res_cache_hits, + increment_p2p_req_res_cache_misses, +}; +use fuel_core_storage::Result as StorageResult; +use fuel_core_types::{ + blockchain::SealedBlockHeader, + services::p2p::Transactions, +}; +use std::ops::Range; +type BlockHeight = u32; + +pub(super) struct CachedView { + sealed_block_headers: DashMap, + transactions_on_blocks: DashMap, + metrics: bool, +} + +impl CachedView { + pub fn new(metrics: bool) -> Self { + Self { + sealed_block_headers: DashMap::new(), + transactions_on_blocks: DashMap::new(), + metrics, + } + } + + pub(super) fn clear(&self) { + self.sealed_block_headers.clear(); + self.transactions_on_blocks.clear(); + } + + fn update_metrics(&self, update_fn: U) + where + U: FnOnce(), + { + if self.metrics { + update_fn() + } + } + + fn get_from_cache_or_db( + &self, + cache: &DashMap, + view: &V, + range: Range, + fetch_fn: F, + ) -> StorageResult>> + where + V: P2pDb, + T: Clone, + F: Fn(&V, Range) -> StorageResult>>, + { + let mut items = Vec::new(); + let mut missing_start = None; + + for height in range.clone() { + if let Some(item) = cache.get(&height) { + // TODO(2436): replace with cheap Arc clone + items.push(item.clone()); + } else { + missing_start = Some(height); + break; + } + } + + let Some(missing_start) = missing_start else { + self.update_metrics(increment_p2p_req_res_cache_hits); + return Ok(Some(items)); + }; + + let missing_range = missing_start..range.end; + + self.update_metrics(increment_p2p_req_res_cache_misses); + if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? { + for (height, item) in missing_range.zip(fetched_items.iter()) { + cache.insert(height, item.clone()); + // TODO(2436): replace with cheap Arc clone + items.push(item.clone()); + } + + return Ok(Some(items)); + } + + Ok(None) + } + + pub(crate) fn get_sealed_headers( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + self.get_from_cache_or_db( + &self.sealed_block_headers, + view, + block_height_range, + V::get_sealed_headers, + ) + } + + pub(crate) fn get_transactions( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + self.get_from_cache_or_db( + &self.transactions_on_blocks, + view, + block_height_range, + V::get_transactions, + ) + } +} + +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use super::*; + use fuel_core_types::blockchain::consensus::Genesis; + use std::sync::Arc; + use tokio::sync::Notify; + + struct FakeDb { + sender: Arc, + values: bool, + } + + #[inline] + fn default_sealed_headers(range: Range) -> Vec { + vec![SealedBlockHeader::default(); range.len()] + } + + #[inline] + fn default_transactions(range: Range) -> Vec { + vec![Transactions::default(); range.len()] + } + + impl P2pDb for FakeDb { + fn get_sealed_headers( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let headers = default_sealed_headers(range); + Ok(Some(headers)) + } + + fn get_transactions( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let transactions = default_transactions(range); + Ok(Some(transactions)) + } + + fn get_genesis(&self) -> StorageResult { + self.sender.notify_waiters(); + Ok(Genesis::default()) + } + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + let block_height_range = 0..100; + let sealed_headers = default_sealed_headers(block_height_range.clone()); + for (block_height, header) in + block_height_range.clone().zip(sealed_headers.iter()) + { + cached_view + .sealed_block_headers + .insert(block_height, header.clone()); + } + + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let sealed_headers = default_sealed_headers(block_height_range.clone()); + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + let block_height_range = 0..100; + let transactions = default_transactions(block_height_range.clone()); + + for (block_height, transactions) in + block_height_range.clone().zip(transactions.iter()) + { + cached_view + .transactions_on_blocks + .insert(block_height, transactions.clone()); + } + + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let transactions = default_transactions(block_height_range.clone()); + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } +} diff --git a/crates/services/p2p/src/lib.rs b/crates/services/p2p/src/lib.rs index fbd82c2545..375eeb7351 100644 --- a/crates/services/p2p/src/lib.rs +++ b/crates/services/p2p/src/lib.rs @@ -16,6 +16,8 @@ pub mod request_response; pub mod service; mod utils; +mod cached_view; + pub use gossipsub::config as gossipsub_config; pub use heartbeat::Config; diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c85e1e3a6c..c84fbc9923 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,4 +1,5 @@ use crate::{ + cached_view::CachedView, codecs::postcard::PostcardCodec, config::{ Config, @@ -412,6 +413,11 @@ pub struct Task { heartbeat_max_time_since_last: Duration, next_check_time: Instant, heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig, + // cached view + cached_view: Arc, + // milliseconds wait time between cache reset + cache_reset_interval: Duration, + next_cache_reset_time: Instant, } #[derive(Default, Clone)] @@ -444,7 +450,7 @@ impl UninitializedTask { } } -impl Task { +impl Task { fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { if peer_info.heartbeat_data.duration_since_last_heartbeat() @@ -493,6 +499,7 @@ where V: AtomicView + 'static, V::LatestView: P2pDb, T: TxPool + 'static, + B: Send, { fn update_metrics(&self, update_fn: U) where @@ -532,8 +539,9 @@ where max_len: usize, ) -> anyhow::Result<()> where - DbLookUpFn: - Fn(&V::LatestView, Range) -> anyhow::Result> + Send + 'static, + DbLookUpFn: Fn(&V::LatestView, &Arc, Range) -> anyhow::Result> + + Send + + 'static, ResponseSenderFn: Fn(Result) -> V2ResponseMessage + Send + 'static, TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest @@ -564,22 +572,25 @@ where } let view = self.view_provider.latest_view()?; - let result = self.db_heavy_task_processor.try_spawn(move || { - if instant.elapsed() > timeout { - tracing::warn!("Request timed out"); - return; - } + let result = self.db_heavy_task_processor.try_spawn({ + let cached_view = self.cached_view.clone(); + move || { + if instant.elapsed() > timeout { + tracing::warn!("Request timed out"); + return; + } - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Add new error code - let response = db_lookup(&view, range.clone()) - .ok() - .flatten() - .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 + // Add new error code + let response = db_lookup(&view, &cached_view, range.clone()) + .ok() + .flatten() + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); - let _ = response_channel - .try_send(task_request(response, request_id)) - .trace_err("Failed to send response to the request channel"); + let _ = response_channel + .try_send(task_request(response, request_id)) + .trace_err("Failed to send response to the request channel"); + } }); // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 @@ -603,7 +614,11 @@ where range, request_id, V2ResponseMessage::Transactions, - |view, range| view.get_transactions(range).map_err(anyhow::Error::from), + |view, cached_view, range| { + cached_view + .get_transactions(view, range) + .map_err(anyhow::Error::from) + }, |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, request_id, @@ -621,7 +636,11 @@ where range, request_id, V2ResponseMessage::SealedHeaders, - |view, range| view.get_sealed_headers(range).map_err(anyhow::Error::from), + |view, cached_view, range| { + cached_view + .get_sealed_headers(view, range) + .map_err(anyhow::Error::from) + }, |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, request_id, @@ -773,6 +792,7 @@ where heartbeat_max_time_since_last, database_read_threads, tx_pool_threads, + metrics, .. } = config; @@ -805,6 +825,11 @@ where AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?; let request_sender = broadcast.request_sender.clone(); + let cache_reset_interval = Duration::from_millis(10_000); + let next_cache_reset_time = Instant::now() + .checked_add(cache_reset_interval) + .expect("The cache reset interval should be small enough to do frequently"); + let task = Task { chain_id, response_timeout, @@ -824,6 +849,9 @@ where heartbeat_max_time_since_last, next_check_time, heartbeat_peer_reputation_config, + cached_view: Arc::new(CachedView::new(metrics)), + cache_reset_interval, + next_cache_reset_time, }; Ok(task) } @@ -849,6 +877,12 @@ where _ = watcher.while_started() => { should_continue = false; }, + _ = tokio::time::sleep_until(self.next_cache_reset_time) => { + should_continue = true; + tracing::debug!("Resetting req/res protocol cache"); + self.cached_view.clear(); + self.next_cache_reset_time += self.cache_reset_interval; + }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { let _ = self.p2p_service.update_block_height(latest_block_height); @@ -1601,17 +1635,22 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), + cached_view: Arc::new(CachedView::new(false)), + cache_reset_interval: Duration::from_secs(10), + next_cache_reset_time: Instant::now(), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); // when - task.run(&mut watcher).await.unwrap(); + let (report_peer_id, report, reporting_service) = tokio::time::timeout( + Duration::from_secs(1), + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher), + ) + .await + .unwrap(); // then - let (report_peer_id, report, reporting_service) = - report_receiver.recv().await.unwrap(); - watch_sender.send(State::Stopped).unwrap(); assert_eq!( @@ -1691,17 +1730,23 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), + cached_view: Arc::new(CachedView::new(false)), + cache_reset_interval: Duration::from_secs(10), + next_cache_reset_time: Instant::now(), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); // when - task.run(&mut watcher).await.unwrap(); + // we run this in a loop to ensure that the task is run until it reports + let (report_peer_id, report, reporting_service) = tokio::time::timeout( + Duration::from_secs(1), + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher), + ) + .await + .unwrap(); // then - let (report_peer_id, report, reporting_service) = - report_receiver.recv().await.unwrap(); - watch_sender.send(State::Stopped).unwrap(); assert_eq!( @@ -1715,6 +1760,19 @@ pub mod tests { assert_eq!(reporting_service, "p2p"); } + async fn wait_until_report_received( + report_receiver: &mut Receiver<(FuelPeerId, AppScore, String)>, + task: &mut Task, + watcher: &mut StateWatcher, + ) -> (FuelPeerId, AppScore, String) { + loop { + task.run(watcher).await.unwrap(); + if let Ok((peer_id, recv_report, service)) = report_receiver.try_recv() { + return (peer_id, recv_report, service); + } + } + } + #[tokio::test] async fn should_process_all_imported_block_under_infinite_events_from_p2p() { // Given @@ -1753,6 +1811,9 @@ pub mod tests { heartbeat_max_time_since_last: Default::default(), next_check_time: Instant::now(), heartbeat_peer_reputation_config: Default::default(), + cached_view: Arc::new(CachedView::new(false)), + cache_reset_interval: Duration::from_secs(10), + next_cache_reset_time: Instant::now(), }; let mut watcher = StateWatcher::started(); // End of initialization @@ -1767,4 +1828,58 @@ pub mod tests { .expect("Should process the block height even under p2p pressure"); } } + + #[tokio::test] + async fn cached_view__is_reset_after_interval_passed() { + // given + let p2p_service = FakeP2PService { + peer_info: vec![], + next_event_stream: Box::pin(futures::stream::pending()), + }; + let (request_sender, request_receiver) = mpsc::channel(100); + + let (report_sender, _) = mpsc::channel(100); + let broadcast = FakeBroadcast { + peer_reports: report_sender, + }; + + let cache_reset_interval = Duration::from_millis(100); + let next_cache_reset_time = Instant::now(); + let mut task = Task { + chain_id: Default::default(), + response_timeout: Default::default(), + p2p_service, + view_provider: FakeDB, + tx_pool: FakeTxPool, + next_block_height: FakeBlockImporter.next_block_height(), + request_receiver, + request_sender, + db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(), + tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(), + broadcast, + max_headers_per_request: 0, + max_txs_per_request: 100, + heartbeat_check_interval: Duration::from_secs(10), + heartbeat_max_avg_interval: Default::default(), + heartbeat_max_time_since_last: Default::default(), + next_check_time: Instant::now(), + heartbeat_peer_reputation_config: Default::default(), + cached_view: Arc::new(CachedView::new(false)), + cache_reset_interval, + next_cache_reset_time, + }; + let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); + let mut watcher = StateWatcher::from(watch_receiver); + + // when + task.run(&mut watcher).await.unwrap(); + + // then + // we raise the sleep factor to ensure that the cache is reset + tokio::time::sleep(cache_reset_interval * 2).await; + watch_sender.send(State::Stopped).unwrap(); + + // if this was changed, we can be sure that the cache was reset + assert_ne!(task.next_cache_reset_time, next_cache_reset_time); + } }