Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(p2p): cache responses to serve without roundtrip to db #2352

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ba6bdab
chore: changelog
rymnc Oct 14, 2024
5a51e0c
test: arc mutex on cachedview, reset every 10 sec
rymnc Oct 14, 2024
8079739
fix: err
rymnc Oct 14, 2024
19a32f3
fix(p2p): use dashmap, remove mutex, remove instantiation with view
rymnc Oct 14, 2024
805a866
fix: fmt
rymnc Oct 14, 2024
fd0aea0
Merge branch 'master' into fix/p2p-round-robin
rymnc Oct 15, 2024
c5db7a6
chore: add metrics to cache hits/misses
rymnc Oct 15, 2024
b4763be
fix: clippy
rymnc Oct 15, 2024
b6bbf61
Merge branch 'master' into fix/p2p-round-robin
rymnc Oct 15, 2024
5b03fa0
chore: refactor cached_view into own module
rymnc Oct 15, 2024
6422210
chore: retain time based clear, but cache is now on a per block basis…
rymnc Oct 16, 2024
2b2a8fb
fix: metrics logging and clearing
rymnc Oct 16, 2024
0a3724d
Merge branch 'master' into fix/p2p-round-robin
rymnc Oct 28, 2024
f597ac9
fix: fmt and clippy
rymnc Oct 28, 2024
25515ee
Update CHANGELOG.md
rymnc Oct 29, 2024
e481e6a
Update crates/services/p2p/src/cached_view.rs
rymnc Oct 29, 2024
5ad5093
fix: make fetch generic
rymnc Oct 29, 2024
a884429
Merge branch 'master' into fix/p2p-round-robin
rymnc Oct 29, 2024
f16cb2f
fix: use let-else pattern
rymnc Oct 30, 2024
c6f00af
Merge branch 'master' into fix/p2p-round-robin
rymnc Oct 30, 2024
1630e2b
fix: add test
rymnc Oct 30, 2024
22d09cf
Merge branch 'master' into fix/p2p-round-robin
rymnc Oct 30, 2024
09d7bd2
fix: call run multiple times until heartbeat report is available
rymnc Oct 30, 2024
3972a47
fix: clippy
rymnc Oct 30, 2024
69de071
fix: dont allow cache reset interval to swallow runtime of other tasks
rymnc Oct 30, 2024
c634764
Merge branch 'master' into fix/p2p-round-robin
rymnc Oct 30, 2024
7a0a776
chore: visibility of CachedView
rymnc Nov 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2310](https://github.com/FuelLabs/fuel-core/pull/2310): The `metrics` command-line parameter has been replaced with `disable-metrics`. Metrics are now enabled by default, with the option to disable them entirely or on a per-module basis.
- [2341](https://github.com/FuelLabs/fuel-core/pull/2341): The maximum number of processed coins from the `coins_to_spend` query is limited to `max_inputs`.

### Fixed

- [2352](https://github.com/FuelLabs/fuel-core/pull/2352): Fetches transactions during sync phase from any node that can provide it instead of just 1.
rymnc marked this conversation as resolved.
Show resolved Hide resolved

## [Version 0.39.0]

### Added
Expand Down
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions crates/metrics/src/p2p_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@ use std::sync::OnceLock;
pub struct P2PMetrics {
pub unique_peers: Counter,
pub blocks_requested: Gauge,
pub p2p_req_res_cache_hits: Counter,
pub p2p_req_res_cache_misses: Counter,
}

impl P2PMetrics {
fn new() -> Self {
let unique_peers = Counter::default();
let blocks_requested = Gauge::default();
let p2p_req_res_cache_hits = Counter::default();
let p2p_req_res_cache_misses = Counter::default();

let metrics = P2PMetrics {
unique_peers,
blocks_requested,
p2p_req_res_cache_hits,
p2p_req_res_cache_misses,
};

let mut registry = global_registry().registry.lock();
Expand All @@ -33,6 +39,18 @@ impl P2PMetrics {
metrics.blocks_requested.clone()
);

registry.register(
"P2p_Req_Res_Cache_Hits",
"A Counter which keeps track of the number of cache hits for the p2p req/res protocol",
metrics.p2p_req_res_cache_hits.clone()
);

registry.register(
"P2p_Req_Res_Cache_Misses",
"A Counter which keeps track of the number of cache misses for the p2p req/res protocol",
metrics.p2p_req_res_cache_misses.clone()
);

metrics
}
}
Expand All @@ -50,3 +68,11 @@ pub fn increment_unique_peers() {
pub fn set_blocks_requested(count: usize) {
p2p_metrics().blocks_requested.set(count as i64);
}

pub fn increment_p2p_req_res_cache_hits() {
p2p_metrics().p2p_req_res_cache_hits.inc();
}

pub fn increment_p2p_req_res_cache_misses() {
p2p_metrics().p2p_req_res_cache_misses.inc();
}
1 change: 1 addition & 0 deletions crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ description = "Fuel client networking"
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
dashmap = "6.1.0"
netrome marked this conversation as resolved.
Show resolved Hide resolved
fuel-core-chain-config = { workspace = true }
fuel-core-metrics = { workspace = true } # TODO make this a feature
fuel-core-services = { workspace = true, features = ["sync-processor"] }
Expand Down
278 changes: 278 additions & 0 deletions crates/services/p2p/src/cached_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
use crate::ports::P2pDb;
use dashmap::DashMap;
use fuel_core_metrics::p2p_metrics::{
increment_p2p_req_res_cache_hits,
increment_p2p_req_res_cache_misses,
};
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::SealedBlockHeader,
services::p2p::Transactions,
};
use std::ops::Range;

pub struct CachedView {
rymnc marked this conversation as resolved.
Show resolved Hide resolved
sealed_block_headers: DashMap<Range<u32>, Vec<SealedBlockHeader>>,
transactions_on_blocks: DashMap<Range<u32>, Vec<Transactions>>,
metrics: bool,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit hesitant to the current approach of storing everything and clearing on a regular interval. Right now, there is no memory limit of the cache, and we use ranges as keys. So if someone queries the ranges (1..=4, 1..=2, 3..=4), we'd store all blocks in the 1..=4 range twice - and this could theoretically grow quadratically for larger ranges.

I would assume that the most popular queries at a given time are quite similar. Why not use a normal LRU cache with fixed memory size? Alternatively just maintain a cache over the last $N$ block headers and their transactions, evicting old ones as new ones gets populated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, its still wip.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I see this PR is still a draft :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now use block height as the key in 6422210

we will retain the time-based eviction strategy for now


impl CachedView {
pub fn new(metrics: bool) -> Self {
Self {
sealed_block_headers: DashMap::new(),
transactions_on_blocks: DashMap::new(),
metrics,
}
}

pub fn clear(&self) {
rymnc marked this conversation as resolved.
Show resolved Hide resolved
self.sealed_block_headers.clear();
self.transactions_on_blocks.clear();
}

fn update_metrics<U>(&self, update_fn: U)
where
U: FnOnce(),
{
if self.metrics {
update_fn()
}
}

pub(crate) fn get_sealed_headers<V>(
&self,
view: &V,
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<SealedBlockHeader>>>
where
V: P2pDb,
{
if let Some(headers) = self.sealed_block_headers.get(&block_height_range) {
self.update_metrics(increment_p2p_req_res_cache_hits);
Ok(Some(headers.clone()))
} else {
self.update_metrics(increment_p2p_req_res_cache_misses);
let headers = view.get_sealed_headers(block_height_range.clone())?;
if let Some(headers) = &headers {
self.sealed_block_headers
.insert(block_height_range, headers.clone());
}
Ok(headers)
}
}

pub(crate) fn get_transactions<V>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_sealed_headers and get_transactions look very similar. Is there any way to have a single function that takes the dashmap and the fetch from the db functions in input?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seeing that even @netrome requested this, addressed in 5ad5093

&self,
view: &V,
block_height_range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>>
where
V: P2pDb,
{
if let Some(transactions) = self.transactions_on_blocks.get(&block_height_range) {
self.update_metrics(increment_p2p_req_res_cache_hits);
Ok(Some(transactions.clone()))
} else {
self.update_metrics(increment_p2p_req_res_cache_misses);
let transactions = view.get_transactions(block_height_range.clone())?;
if let Some(transactions) = &transactions {
self.transactions_on_blocks
.insert(block_height_range, transactions.clone());
}
Ok(transactions)
}
}
}

#[allow(non_snake_case)]
#[cfg(test)]
mod tests {
use super::*;
use fuel_core_types::blockchain::consensus::Genesis;
use std::sync::Arc;
use tokio::sync::Notify;

struct FakeDb {
sender: Arc<Notify>,
values: bool,
}

#[inline]
fn default_sealed_headers(range: Range<u32>) -> Vec<SealedBlockHeader> {
vec![SealedBlockHeader::default(); range.len()]
}

#[inline]
fn default_transactions(range: Range<u32>) -> Vec<Transactions> {
vec![Transactions::default(); range.len()]
}

impl P2pDb for FakeDb {
fn get_sealed_headers(
&self,
range: Range<u32>,
) -> StorageResult<Option<Vec<SealedBlockHeader>>> {
self.sender.notify_waiters();
if !self.values {
return Ok(None);
}
let headers = default_sealed_headers(range);
Ok(Some(headers))
}

fn get_transactions(
&self,
range: Range<u32>,
) -> StorageResult<Option<Vec<Transactions>>> {
self.sender.notify_waiters();
if !self.values {
return Ok(None);
}
let transactions = default_transactions(range);
Ok(Some(transactions))
}

fn get_genesis(&self) -> StorageResult<Genesis> {
self.sender.notify_waiters();
Ok(Genesis::default())
}
}

#[tokio::test]
async fn cached_view__get_sealed_headers__cache_hit() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(false);

let block_height_range = 0..10;
let sealed_headers = vec![SealedBlockHeader::default()];
cached_view
.sealed_block_headers
.insert(block_height_range.clone(), sealed_headers.clone());

let result = cached_view
.get_sealed_headers(&db, block_height_range.clone())
.unwrap();
assert_eq!(result, Some(sealed_headers));
}

#[tokio::test]
async fn cached_view__get_sealed_headers__cache_miss() {
// given
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(false);

// when
let notified = sender.notified();
let block_height_range = 0..10;
let sealed_headers = default_sealed_headers(block_height_range.clone());
let result = cached_view
.get_sealed_headers(&db, block_height_range.clone())
.unwrap();

// then
notified.await;
assert_eq!(result, Some(sealed_headers));
}

#[tokio::test]
async fn cached_view__when_response_is_none__get_sealed_headers__cache_miss() {
// given
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: false,
};
let cached_view = CachedView::new(false);

// when
let notified = sender.notified();
let block_height_range = 0..10;
let result = cached_view
.get_sealed_headers(&db, block_height_range.clone())
.unwrap();

// then
notified.await;
assert!(result.is_none());
}

#[tokio::test]
async fn cached_view__get_transactions__cache_hit() {
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(false);

let block_height_range = 0..10;
let transactions = default_transactions(block_height_range.clone());
cached_view
.transactions_on_blocks
.insert(block_height_range.clone(), transactions.clone());

let result = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();

for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) {
assert_eq!(expected.0, actual.0);
}
}

#[tokio::test]
async fn cached_view__get_transactions__cache_miss() {
// given
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: true,
};
let cached_view = CachedView::new(false);

// when
let notified = sender.notified();
let block_height_range = 0..10;
let transactions = default_transactions(block_height_range.clone());
let result = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();

// then
notified.await;
for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) {
assert_eq!(expected.0, actual.0);
}
}

#[tokio::test]
async fn cached_view__when_response_is_none__get_transactions__cache_miss() {
// given
let sender = Arc::new(Notify::new());
let db = FakeDb {
sender: sender.clone(),
values: false,
};
let cached_view = CachedView::new(false);

// when
let notified = sender.notified();
let block_height_range = 0..10;
let result = cached_view
.get_transactions(&db, block_height_range.clone())
.unwrap();

// then
notified.await;
assert!(result.is_none());
}
}
2 changes: 2 additions & 0 deletions crates/services/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod request_response;
pub mod service;
mod utils;

mod cached_view;

pub use gossipsub::config as gossipsub_config;
pub use heartbeat::Config;

Expand Down
Loading
Loading