Skip to content

Commit d4bd975

Browse files
committed
wip
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 3f31322 commit d4bd975

2 files changed

Lines changed: 17 additions & 9 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
243243
*/
244244
@Override
245245
public Stream<R> list(String namespace, Predicate<R> predicate) {
246-
return mergeWithTempCacheForList(manager().list(namespace, predicate), namespace, predicate);
246+
return mergeWithTempCacheForList(manager().list(namespace), namespace, predicate);
247247
}
248248

249249
/**
@@ -254,7 +254,7 @@ public Stream<R> list(String namespace, Predicate<R> predicate) {
254254
*/
255255
@Override
256256
public Stream<R> list(Predicate<R> predicate) {
257-
return mergeWithTempCacheForList(cache.list(predicate), null, predicate);
257+
return mergeWithTempCacheForList(cache.list(), null, predicate);
258258
}
259259

260260
/**
@@ -282,14 +282,16 @@ public List<R> byIndex(String indexName, String indexKey) {
282282
.collect(Collectors.toList());
283283
}
284284

285+
// namespace is filtered on informer manager level
285286
private Stream<R> mergeWithTempCacheForList(
286287
Stream<R> stream, String namespace, Predicate<R> predicate) {
287288
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
288-
return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate));
289+
290+
return stream.filter(filterResourceByPredicate(predicate));
289291
}
290292
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
291293
if (tempResources.isEmpty()) {
292-
return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate));
294+
return stream.filter(filterResourceByPredicate(predicate));
293295
}
294296

295297
var upToDateList =
@@ -304,7 +306,8 @@ private Stream<R> mergeWithTempCacheForList(
304306
}
305307
return r;
306308
})
307-
.filter(filterResourceByNamespaceAndPredicate(namespace, predicate))
309+
// we filter on predicate only since namespace changes would not be detected any ways.
310+
.filter(filterResourceByPredicate(predicate))
308311
.toList();
309312

310313
return Stream.concat(
@@ -352,6 +355,11 @@ private Stream<R> mergeWithTempCacheForIndex(
352355
upToDateList.stream());
353356
}
354357

358+
private static <R extends HasMetadata> Predicate<R> filterResourceByPredicate(
359+
Predicate<R> predicate) {
360+
return filterResourceByNamespaceAndPredicate(null, predicate);
361+
}
362+
355363
private static <R extends HasMetadata> Predicate<R> filterResourceByNamespaceAndPredicate(
356364
String namespace, Predicate<R> predicate) {
357365
return r -> {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ void listKeepsResourceWhenNotInTempCache() {
588588
when(temporaryResourceCache.getResources()).thenReturn(new HashMap<>());
589589

590590
var mim = mock(InformerManager.class);
591-
when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
591+
when(mim.list(nullable(String.class))).thenReturn(Stream.of(original));
592592
doReturn(mim).when(informerEventSource).manager();
593593

594594
var result = informerEventSource.list(null, r -> true).toList();
@@ -608,7 +608,7 @@ void listReplacesOnlyMatchingResources() {
608608
.thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(dep1), newerDep1)));
609609

610610
var informerManager = mock(InformerManager.class);
611-
when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2));
611+
when(informerManager.list(nullable(String.class))).thenReturn(Stream.of(dep1, dep2));
612612
doReturn(informerManager).when(informerEventSource).manager();
613613

614614
var result = informerEventSource.list(null, r -> true).toList();
@@ -668,7 +668,7 @@ void listKeepsResourceWhenTempCacheHasOlderVersion() {
668668
.thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp)));
669669

670670
var mim = mock(InformerManager.class);
671-
when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
671+
when(mim.list(nullable(String.class))).thenReturn(Stream.of(original));
672672
doReturn(mim).when(informerEventSource).manager();
673673

674674
var result = informerEventSource.list(null, r -> true).toList();
@@ -706,7 +706,7 @@ void listAddsGhostResources() {
706706
.thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(ghostResource), ghostResource)));
707707

708708
var mim = mock(InformerManager.class);
709-
when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource));
709+
when(mim.list(nullable(String.class))).thenReturn(Stream.of(resource));
710710
doReturn(mim).when(informerEventSource).manager();
711711

712712
var result = informerEventSource.list(null, r -> true).toList();

0 commit comments

Comments
 (0)