Skip to content

Commit 729ce0c

Browse files
committed
Read-cache-after-write aware list and by index
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 43807cf commit 729ce0c

4 files changed

Lines changed: 251 additions & 105 deletions

File tree

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,22 @@
2424
@SuppressWarnings("unchecked")
2525
public interface ResourceCache<T extends HasMetadata> extends Cache<T> {
2626

27+
/**
28+
* Lists all resources in the given namespace.
29+
*
30+
* @param namespace the namespace to list resources from
31+
* @return a stream of all cached resources in the namespace
32+
*/
2733
default Stream<T> list(String namespace) {
2834
return list(namespace, TRUE);
2935
}
3036

37+
/**
38+
* Lists resources in the given namespace that match the provided predicate.
39+
*
40+
* @param namespace the namespace to list resources from
41+
* @param predicate filter to apply on the resources
42+
* @return a stream of cached resources matching the predicate
43+
*/
3144
Stream<T> list(String namespace, Predicate<T> predicate);
3245
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,45 @@
2525
public interface Cache<T> {
2626
Predicate TRUE = (a) -> true;
2727

28+
/**
29+
* Retrieves a resource from the cache by its {@link ResourceID}.
30+
*
31+
* @param resourceID the identifier of the resource
32+
* @return an Optional containing the resource if present in the cache
33+
*/
2834
Optional<T> get(ResourceID resourceID);
2935

36+
/**
37+
* Checks whether a resource with the given {@link ResourceID} exists in the cache.
38+
*
39+
* @param resourceID the identifier of the resource
40+
* @return {@code true} if the resource is present in the cache
41+
*/
3042
default boolean contains(ResourceID resourceID) {
3143
return get(resourceID).isPresent();
3244
}
3345

46+
/**
47+
* Returns a stream of all {@link ResourceID}s currently in the cache.
48+
*
49+
* @return a stream of resource identifiers
50+
*/
3451
Stream<ResourceID> keys();
3552

53+
/**
54+
* Lists all resources in the cache.
55+
*
56+
* @return a stream of all cached resources
57+
*/
3658
default Stream<T> list() {
3759
return list(TRUE);
3860
}
3961

62+
/**
63+
* Lists resources in the cache that match the provided predicate.
64+
*
65+
* @param predicate filter to apply on the resources
66+
* @return a stream of cached resources matching the predicate
67+
*/
4068
Stream<T> list(Predicate<T> predicate);
4169
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 101 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.HashMap;
1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.Objects;
2122
import java.util.Optional;
2223
import java.util.Set;
2324
import java.util.function.Function;
@@ -234,134 +235,150 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
234235
this.indexers.putAll(indexers);
235236
}
236237

238+
/**
239+
* {@inheritDoc}
240+
*
241+
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
242+
* temporary resource cache to ensure recently written resources are reflected in the output.
243+
*/
237244
@Override
238245
public Stream<R> list(String namespace, Predicate<R> predicate) {
239-
return manager().list(namespace, predicate);
246+
return mergeWithTempCacheForList(manager().list(namespace, predicate), namespace, predicate);
240247
}
241248

249+
/**
250+
* {@inheritDoc}
251+
*
252+
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
253+
* temporary resource cache to ensure recently written resources are reflected in the output.
254+
*/
242255
@Override
243256
public Stream<R> list(Predicate<R> predicate) {
244-
return cache.list(predicate);
245-
}
246-
247-
@Override
248-
public List<R> byIndex(String indexName, String indexKey) {
249-
return manager().byIndex(indexName, indexKey);
250-
}
251-
252-
public Stream<R> byIndexStream(String indexName, String indexKey) {
253-
return manager().byIndexStream(indexName, indexKey);
257+
return mergeWithTempCacheForList(cache.list(predicate), null, predicate);
254258
}
255259

256260
/**
257-
* Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is
258-
* useful when resources are updated using {@link
259-
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
261+
* {@inheritDoc}
262+
*
263+
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
264+
* temporary resource cache to ensure recently written resources are reflected in the output.
260265
*/
261-
public Stream<R> listWithStrongConsistency(String namespace, Predicate<R> predicate) {
262-
return mergeWithWithTempCacheResources(
263-
manager().list(namespace, predicate), namespace, predicate);
266+
@Override
267+
public Stream<R> byIndexStream(String indexName, String indexKey) {
268+
return mergeWithTempCacheForIndex(
269+
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
264270
}
265271

266272
/**
267-
* Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when
268-
* resources are updated using {@link
269-
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
273+
* {@inheritDoc}
274+
*
275+
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
276+
* temporary resource cache to ensure recently written resources are reflected in the output.
270277
*/
271-
public Stream<R> listWithStrongConsistency(Predicate<R> predicate) {
272-
return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate);
278+
@Override
279+
public List<R> byIndex(String indexName, String indexKey) {
280+
return mergeWithTempCacheForIndex(
281+
manager().byIndexStream(indexName, indexKey), indexName, indexKey)
282+
.collect(Collectors.toList());
273283
}
274284

275-
/**
276-
* Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is
277-
* useful when resources are updated using {@link
278-
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
279-
*/
280-
public Stream<R> byIndexStreamWithStrongConsistency(String indexName, String indexKey) {
281-
return mergeWithWithTempCacheResources(
282-
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
283-
}
285+
private Stream<R> mergeWithTempCacheForList(
286+
Stream<R> stream, String namespace, Predicate<R> predicate) {
287+
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
288+
return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate));
289+
}
290+
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
291+
if (tempResources.isEmpty()) {
292+
return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate));
293+
}
284294

285-
private Stream<R> mergeWithWithTempCacheResources(
286-
Stream<R> stream, String indexName, String indexKey) {
287-
return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey);
288-
}
295+
var upToDateList =
296+
stream
297+
.map(
298+
r -> {
299+
var resourceID = ResourceID.fromResource(r);
300+
var tempResource = tempResources.remove(resourceID);
301+
if (tempResource != null
302+
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
303+
return tempResource;
304+
}
305+
return r;
306+
})
307+
.filter(filterResourceByNamespaceAndPredicate(namespace, predicate))
308+
.toList();
289309

290-
private Stream<R> mergeWithWithTempCacheResources(
291-
Stream<R> stream, String namespace, Predicate<R> predicate) {
292-
return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null);
310+
return Stream.concat(
311+
tempResources.values().stream()
312+
.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)),
313+
upToDateList.stream());
293314
}
294315

295-
private Stream<R> mergeWithWithTempCacheResources(
296-
Stream<R> stream,
297-
String namespace,
298-
Predicate<R> predicate,
299-
String indexName,
300-
String indexKey) {
316+
private Stream<R> mergeWithTempCacheForIndex(
317+
Stream<R> stream, String indexName, String indexKey) {
301318
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
302319
return stream;
303320
}
304-
var allTempResources = temporaryResourceCache.getResources();
305-
Map<ResourceID, R> tempResources;
306-
if (namespace == null && predicate == null) {
307-
tempResources = new HashMap<>(allTempResources);
308-
} else {
309-
// filtering the temp cache according the user input (predicate, namespace)
310-
tempResources =
311-
allTempResources.entrySet().stream()
312-
.filter(
313-
e -> {
314-
if (namespace != null) {
315-
var res =
316-
e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false);
317-
if (!res) return false;
318-
}
319-
if (predicate != null) {
320-
return predicate.test(e.getValue());
321-
}
322-
return true;
323-
})
324-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
325-
}
321+
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
326322
if (tempResources.isEmpty()) {
327323
return stream;
328324
}
325+
326+
var indexer = indexers.get(indexName);
327+
if (indexer == null) {
328+
throw new IllegalArgumentException("Indexer not found for: " + indexName);
329+
}
330+
329331
var upToDateList =
330332
stream
331333
.map(
332334
r -> {
333335
var resourceID = ResourceID.fromResource(r);
334-
// removing the id from the related temp resources
335-
// this is important so we can detect ghost resources:
336-
// all that remains is ghost resource
337336
var tempResource = tempResources.remove(resourceID);
338-
// using the latest version
339337
if (tempResource != null
340338
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
339+
if (!indexer.apply(tempResource).contains(indexKey)) {
340+
return null;
341+
}
341342
return tempResource;
342343
}
343344
return r;
344345
})
346+
.filter(Objects::nonNull)
345347
.toList();
346-
Stream<R> tempResourceStream;
347-
// ghost resource handling
348-
if (indexName != null && indexKey != null) {
349-
var indexer = indexers.get(indexName);
350-
if (indexer == null) {
351-
throw new IllegalArgumentException("Indexer not found for: " + indexName);
348+
349+
// remaining temp resources are ghost resources — include only those matching the index
350+
return Stream.concat(
351+
tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)),
352+
upToDateList.stream());
353+
}
354+
355+
private static <R extends HasMetadata> Predicate<R> filterResourceByNamespaceAndPredicate(
356+
String namespace, Predicate<R> predicate) {
357+
return r -> {
358+
if (namespace != null) {
359+
var res = Optional.of(r).map(ns -> ns.equals(namespace)).orElse(false);
360+
if (!res) return false;
352361
}
353-
// we check if the ghost resource is part of the index
354-
tempResourceStream =
355-
tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey));
356-
} else {
357-
tempResourceStream = tempResources.values().stream();
358-
}
359-
return Stream.concat(tempResourceStream, upToDateList.stream());
362+
if (predicate != null) {
363+
return predicate.test(r);
364+
}
365+
return true;
366+
};
360367
}
361368

369+
/**
370+
* {@inheritDoc}
371+
*
372+
* <p>This implementation is read-cache-after-write consistent. Keys from the temporary resource
373+
* cache (ghost resources) are included in the result.
374+
*/
362375
@Override
363376
public Stream<ResourceID> keys() {
364-
return cache.keys();
377+
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
378+
return manager().keys();
379+
}
380+
var tempKeys = temporaryResourceCache.getResources().keySet();
381+
return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k)));
365382
}
366383

367384
@Override

0 commit comments

Comments
 (0)