Skip to content

Commit 8601151

Browse files
xonx4lalamb
andauthored
fix: flaky cache test (#19140)
## Which issue does this PR close? - Closes #19114. ## Rationale for this change The test test_cache_with_ttl_and_lru was flaky and failing intermittently in CI. It relied on std::thread::sleep and Instant::now(), which caused race conditions when the test environment was slow or under load. This PR makes the test correct by removing reliance on the system clock and thread sleeping. ## What changes are included in this PR? -> Introduced a TimeProvider trait to abstract time retrieval. -> Refactored DefaultListFilesCache to use provider. -> Added DefaultListFilesCache::new_with_provider for testing purposes. -> Updated test_cache_with_ttl_and_lru to use a MockTimeProvider. ## Are these changes tested? Yes. I verified the fix by running the specific test locally without failure. ## Are there any user-facing changes? No. --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent f9c030a commit 8601151

File tree

1 file changed

+96
-32
lines changed

1 file changed

+96
-32
lines changed

datafusion/execution/src/cache/list_files_cache.rs

Lines changed: 96 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::mem::size_of;
1819
use std::{
1920
sync::{Arc, Mutex},
2021
time::Duration,
@@ -25,6 +26,19 @@ use object_store::{ObjectMeta, path::Path};
2526

2627
use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue};
2728

29+
pub trait TimeProvider: Send + Sync + 'static {
30+
fn now(&self) -> Instant;
31+
}
32+
33+
#[derive(Debug, Default)]
34+
pub struct SystemTimeProvider;
35+
36+
impl TimeProvider for SystemTimeProvider {
37+
fn now(&self) -> Instant {
38+
Instant::now()
39+
}
40+
}
41+
2842
/// Default implementation of [`ListFilesCache`]
2943
///
3044
/// Caches file metadata for file listing operations.
@@ -41,9 +55,15 @@ use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQ
4155
/// Users should use the [`Self::get`] and [`Self::put`] methods. The
4256
/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call
4357
/// `get` and `put`, respectively.
44-
#[derive(Default)]
4558
pub struct DefaultListFilesCache {
4659
state: Mutex<DefaultListFilesCacheState>,
60+
time_provider: Arc<dyn TimeProvider>,
61+
}
62+
63+
impl Default for DefaultListFilesCache {
64+
fn default() -> Self {
65+
Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
66+
}
4767
}
4868

4969
impl DefaultListFilesCache {
@@ -55,9 +75,16 @@ impl DefaultListFilesCache {
5575
pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
5676
Self {
5777
state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
78+
time_provider: Arc::new(SystemTimeProvider),
5879
}
5980
}
6081

82+
#[cfg(test)]
83+
pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
84+
self.time_provider = provider;
85+
self
86+
}
87+
6188
/// Returns the cache's memory limit in bytes.
6289
pub fn cache_limit(&self) -> usize {
6390
self.state.lock().unwrap().memory_limit
@@ -83,14 +110,18 @@ struct ListFilesEntry {
83110
}
84111

85112
impl ListFilesEntry {
86-
fn try_new(metas: Arc<Vec<ObjectMeta>>, ttl: Option<Duration>) -> Option<Self> {
113+
fn try_new(
114+
metas: Arc<Vec<ObjectMeta>>,
115+
ttl: Option<Duration>,
116+
now: Instant,
117+
) -> Option<Self> {
87118
let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
88119
+ metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?;
89120

90121
Some(Self {
91122
metas,
92123
size_bytes,
93-
expires: ttl.map(|t| Instant::now() + t),
124+
expires: ttl.map(|t| now + t),
94125
})
95126
}
96127
}
@@ -141,47 +172,54 @@ impl DefaultListFilesCacheState {
141172
}
142173
}
143174

144-
/// Returns the respective entry from the cache, if it exists and the entry has not expired.
175+
/// Returns the respective entry from the cache, if it exists and the entry
176+
/// has not expired by `now`.
177+
///
145178
/// If the entry exists it becomes the most recently used. If the entry has expired it is
146179
/// removed from the cache
147-
fn get(&mut self, key: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
180+
fn get(&mut self, key: &Path, now: Instant) -> Option<Arc<Vec<ObjectMeta>>> {
148181
let entry = self.lru_queue.get(key)?;
149182

150183
match entry.expires {
151-
Some(exp) if Instant::now() > exp => {
184+
Some(exp) if now > exp => {
152185
self.remove(key);
153186
None
154187
}
155188
_ => Some(Arc::clone(&entry.metas)),
156189
}
157190
}
158191

159-
/// Checks if the respective entry is currently cached. If the entry has expired it is removed
160-
/// from the cache.
192+
/// Checks if the respective entry is currently cached.
193+
///
194+
/// If the entry has expired by `now` it is removed from the cache.
195+
///
161196
/// The LRU queue is not updated.
162-
fn contains_key(&mut self, k: &Path) -> bool {
197+
fn contains_key(&mut self, k: &Path, now: Instant) -> bool {
163198
let Some(entry) = self.lru_queue.peek(k) else {
164199
return false;
165200
};
166201

167202
match entry.expires {
168-
Some(exp) if Instant::now() > exp => {
203+
Some(exp) if now > exp => {
169204
self.remove(k);
170205
false
171206
}
172207
_ => true,
173208
}
174209
}
175210

176-
/// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required.
211+
/// Adds a new key-value pair to cache expiring at `now` + the TTL.
212+
///
213+
/// This means that LRU entries might be evicted if required.
177214
/// If the key is already in the cache, the previous entry is returned.
178215
/// If the size of the entry is greater than the `memory_limit`, the value is not inserted.
179216
fn put(
180217
&mut self,
181218
key: &Path,
182219
value: Arc<Vec<ObjectMeta>>,
220+
now: Instant,
183221
) -> Option<Arc<Vec<ObjectMeta>>> {
184-
let entry = ListFilesEntry::try_new(value, self.ttl)?;
222+
let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
185223
let entry_size = entry.size_bytes;
186224

187225
// no point in trying to add this value to the cache if it cannot fit entirely
@@ -263,7 +301,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
263301

264302
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
265303
let mut state = self.state.lock().unwrap();
266-
state.get(k)
304+
let now = self.time_provider.now();
305+
state.get(k, now)
267306
}
268307

269308
fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option<Arc<Vec<ObjectMeta>>> {
@@ -276,7 +315,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
276315
value: Arc<Vec<ObjectMeta>>,
277316
) -> Option<Arc<Vec<ObjectMeta>>> {
278317
let mut state = self.state.lock().unwrap();
279-
state.put(key, value)
318+
let now = self.time_provider.now();
319+
state.put(key, value, now)
280320
}
281321

282322
fn put_with_extra(
@@ -295,7 +335,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
295335

296336
fn contains_key(&self, k: &Path) -> bool {
297337
let mut state = self.state.lock().unwrap();
298-
state.contains_key(k)
338+
let now = self.time_provider.now();
339+
state.contains_key(k, now)
299340
}
300341

301342
fn len(&self) -> usize {
@@ -319,6 +360,31 @@ mod tests {
319360
use chrono::DateTime;
320361
use std::thread;
321362

363+
struct MockTimeProvider {
364+
base: Instant,
365+
offset: Mutex<Duration>,
366+
}
367+
368+
impl MockTimeProvider {
369+
fn new() -> Self {
370+
Self {
371+
base: Instant::now(),
372+
offset: Mutex::new(Duration::ZERO),
373+
}
374+
}
375+
376+
fn inc(&self, duration: Duration) {
377+
let mut offset = self.offset.lock().unwrap();
378+
*offset += duration;
379+
}
380+
}
381+
382+
impl TimeProvider for MockTimeProvider {
383+
fn now(&self) -> Instant {
384+
self.base + *self.offset.lock().unwrap()
385+
}
386+
}
387+
322388
/// Helper function to create a test ObjectMeta with a specific path and location string size
323389
fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
324390
// Create a location string of the desired size by padding with zeros
@@ -565,9 +631,6 @@ mod tests {
565631
}
566632

567633
#[test]
568-
// Ignored due to flakiness in CI. See
569-
// https://github.com/apache/datafusion/issues/19114
570-
#[ignore]
571634
fn test_cache_with_ttl() {
572635
let ttl = Duration::from_millis(100);
573636
let cache = DefaultListFilesCache::new(10000, Some(ttl));
@@ -596,32 +659,32 @@ mod tests {
596659
}
597660

598661
#[test]
599-
// Ignored due to flakiness in CI. See
600-
// https://github.com/apache/datafusion/issues/19114
601-
#[ignore]
602662
fn test_cache_with_ttl_and_lru() {
603663
let ttl = Duration::from_millis(200);
604-
let cache = DefaultListFilesCache::new(1000, Some(ttl));
664+
665+
let mock_time = Arc::new(MockTimeProvider::new());
666+
let cache = DefaultListFilesCache::new(1000, Some(ttl))
667+
.with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);
605668

606669
let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
607670
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
608671
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
609672

610673
cache.put(&path1, value1);
611-
thread::sleep(Duration::from_millis(50));
674+
mock_time.inc(Duration::from_millis(50));
612675
cache.put(&path2, value2);
613-
thread::sleep(Duration::from_millis(50));
676+
mock_time.inc(Duration::from_millis(50));
614677

615678
// path3 should evict path1 due to size limit
616679
cache.put(&path3, value3);
617680
assert!(!cache.contains_key(&path1)); // Evicted by LRU
618681
assert!(cache.contains_key(&path2));
619682
assert!(cache.contains_key(&path3));
620683

621-
// Wait for path2 to expire
622-
thread::sleep(Duration::from_millis(150));
684+
mock_time.inc(Duration::from_millis(151));
685+
623686
assert!(!cache.contains_key(&path2)); // Expired
624-
assert!(cache.contains_key(&path3)); // Still valid
687+
assert!(cache.contains_key(&path3)); // Still valid
625688
}
626689

627690
#[test]
@@ -671,15 +734,16 @@ mod tests {
671734
fn test_entry_creation() {
672735
// Test with empty vector
673736
let empty_vec: Arc<Vec<ObjectMeta>> = Arc::new(vec![]);
674-
let entry = ListFilesEntry::try_new(empty_vec, None);
737+
let now = Instant::now();
738+
let entry = ListFilesEntry::try_new(empty_vec, None, now);
675739
assert!(entry.is_none());
676740

677741
// Validate entry size
678742
let metas: Vec<ObjectMeta> = (0..5)
679743
.map(|i| create_test_object_meta(&format!("file{i}"), 30))
680744
.collect();
681745
let metas = Arc::new(metas);
682-
let entry = ListFilesEntry::try_new(metas, None).unwrap();
746+
let entry = ListFilesEntry::try_new(metas, None, now).unwrap();
683747
assert_eq!(entry.metas.len(), 5);
684748
// Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes
685749
let expected_size =
@@ -689,9 +753,9 @@ mod tests {
689753
// Test with TTL
690754
let meta = create_test_object_meta("file", 50);
691755
let ttl = Duration::from_secs(10);
692-
let entry = ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl)).unwrap();
693-
let created = Instant::now();
694-
assert!(entry.expires.unwrap() > created);
756+
let entry =
757+
ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap();
758+
assert!(entry.expires.unwrap() > now);
695759
}
696760

697761
#[test]

0 commit comments

Comments
 (0)