Skip to content

Commit e2c96c0

Browse files
authored
refactor(anvil): flip filter expiration timestamp (#2694)
* refactor(anvil): flip filter expiration timestamp * fix: use interval_at
1 parent 9449e10 commit e2c96c0

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

anvil/src/filter.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@ use anvil_core::eth::subscription::SubscriptionId;
88
use anvil_rpc::response::ResponseResult;
99
use ethers::{
1010
prelude::{Log as EthersLog, H256 as TxHash},
11-
types::FilteredParams,
11+
types::{Filter, FilteredParams},
1212
};
1313
use futures::{channel::mpsc::Receiver, Stream, StreamExt};
14-
15-
use ethers::types::Filter;
1614
use std::{
1715
collections::HashMap,
1816
pin::Pin,
@@ -23,6 +21,7 @@ use std::{
2321
use tokio::sync::Mutex;
2422
use tracing::{trace, warn};
2523

24+
/// Type alias for filters identified by their id and their expiration timestamp
2625
type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
2726

2827
/// timeout after which to remove an active filter if it wasn't polled since then
@@ -45,19 +44,19 @@ impl Filters {
4544
let id = new_id();
4645
trace!(target: "node::filter", "Adding new filter id {}", id);
4746
let mut filters = self.active_filters.lock().await;
48-
filters.insert(id.clone(), (filter, Instant::now()));
47+
filters.insert(id.clone(), (filter, self.next_deadline()));
4948
id
5049
}
5150

5251
pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
5352
{
5453
let mut filters = self.active_filters.lock().await;
55-
if let Some((filter, timestamp)) = filters.get_mut(id) {
54+
if let Some((filter, deadline)) = filters.get_mut(id) {
5655
let resp = filter
5756
.next()
5857
.await
5958
.unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
60-
*timestamp = Instant::now();
59+
*deadline = self.next_deadline();
6160
return resp
6261
}
6362
}
@@ -85,11 +84,17 @@ impl Filters {
8584
self.keepalive
8685
}
8786

87+
/// Returns the timestamp after which a filter should expire
88+
fn next_deadline(&self) -> Instant {
89+
Instant::now() + self.keep_alive()
90+
}
91+
92+
/// Evict all filters that weren't updated and reached there deadline
8893
pub async fn evict(&self) {
8994
trace!(target: "node::filter", "Evicting stale filters");
90-
let deadline = Instant::now() - self.keepalive;
95+
let now = Instant::now();
9196
let mut active_filters = self.active_filters.lock().await;
92-
active_filters.retain(|_, (_, timestamp)| *timestamp > deadline);
97+
active_filters.retain(|_, (_, deadline)| *deadline > now);
9398
}
9499
}
95100

anvil/src/service.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ impl NodeService {
4949
fee_history: FeeHistoryService,
5050
filters: Filters,
5151
) -> Self {
52+
let start = tokio::time::Instant::now() + filters.keep_alive();
53+
let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
5254
Self {
5355
pool,
5456
block_producer: BlockProducer::new(backend),
5557
miner,
5658
fee_history,
57-
filter_eviction_interval: tokio::time::interval(filters.keep_alive()),
59+
filter_eviction_interval,
5860
filters,
5961
}
6062
}

0 commit comments

Comments
 (0)