Skip to content

Commit 402bbd6

Browse files
committed
improve: ExternalResourceIDProvider extended to be used in CacheKeyMapper
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 27f050c commit 402bbd6

File tree

26 files changed

+136
-94
lines changed

26 files changed

+136
-94
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
2525

2626
@Ignore
27-
public abstract class AbstractPollingDependentResource<R, P extends HasMetadata>
28-
extends AbstractExternalDependentResource<R, P, ExternalResourceCachingEventSource<R, P>>
29-
implements CacheKeyMapper<R> {
27+
public abstract class AbstractPollingDependentResource<R, P extends HasMetadata, ID>
28+
extends AbstractExternalDependentResource<R, P, ExternalResourceCachingEventSource<R, P, ID>>
29+
implements CacheKeyMapper<R, ID> {
3030

3131
public static final Duration DEFAULT_POLLING_PERIOD = Duration.ofMillis(5000);
3232
private Duration pollingPeriod;
@@ -52,7 +52,7 @@ public Duration getPollingPeriod() {
5252

5353
// for now dependent resources support event sources only with one owned resource.
5454
@Override
55-
public String keyFor(R resource) {
56-
return CacheKeyMapper.singleResourceCacheKeyMapper().keyFor(resource);
55+
public ID keyFor(R resource) {
56+
return CacheKeyMapper.<R, ID>externalIdProviderMapper().keyFor(resource);
5757
}
5858
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
2626

2727
@Ignore
28-
public abstract class PerResourcePollingDependentResource<R, P extends HasMetadata>
29-
extends AbstractPollingDependentResource<R, P>
28+
public abstract class PerResourcePollingDependentResource<R, P extends HasMetadata, ID>
29+
extends AbstractPollingDependentResource<R, P, ID>
3030
implements PerResourcePollingEventSource.ResourceFetcher<R, P> {
3131

3232
public PerResourcePollingDependentResource() {}
@@ -40,13 +40,13 @@ public PerResourcePollingDependentResource(Class<R> resourceType, Duration polli
4040
}
4141

4242
@Override
43-
protected ExternalResourceCachingEventSource<R, P> createEventSource(
43+
protected ExternalResourceCachingEventSource<R, P, ID> createEventSource(
4444
EventSourceContext<P> context) {
4545

4646
return new PerResourcePollingEventSource<>(
4747
resourceType(),
4848
context,
49-
new PerResourcePollingConfigurationBuilder<>(this, getPollingPeriod())
49+
new PerResourcePollingConfigurationBuilder<R, P, ID>(this, getPollingPeriod())
5050
.withCacheKeyMapper(this)
5151
.withName(name())
5252
.build());

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,25 @@
2626
import io.javaoperatorsdk.operator.processing.event.source.polling.PollingEventSource;
2727

2828
@Ignore
29-
public abstract class PollingDependentResource<R, P extends HasMetadata>
30-
extends AbstractPollingDependentResource<R, P>
29+
public abstract class PollingDependentResource<R, P extends HasMetadata, ID>
30+
extends AbstractPollingDependentResource<R, P, ID>
3131
implements PollingEventSource.GenericResourceFetcher<R> {
3232

33-
private final CacheKeyMapper<R> cacheKeyMapper;
33+
private final CacheKeyMapper<R, ID> cacheKeyMapper;
3434

35-
public PollingDependentResource(Class<R> resourceType, CacheKeyMapper<R> cacheKeyMapper) {
35+
public PollingDependentResource(Class<R> resourceType, CacheKeyMapper<R, ID> cacheKeyMapper) {
3636
super(resourceType);
3737
this.cacheKeyMapper = cacheKeyMapper;
3838
}
3939

4040
public PollingDependentResource(
41-
Class<R> resourceType, Duration pollingPeriod, CacheKeyMapper<R> cacheKeyMapper) {
41+
Class<R> resourceType, Duration pollingPeriod, CacheKeyMapper<R, ID> cacheKeyMapper) {
4242
super(resourceType, pollingPeriod);
4343
this.cacheKeyMapper = cacheKeyMapper;
4444
}
4545

4646
@Override
47-
protected ExternalResourceCachingEventSource<R, P> createEventSource(
47+
protected ExternalResourceCachingEventSource<R, P, ID> createEventSource(
4848
EventSourceContext<P> context) {
4949
return new PollingEventSource<>(
5050
resourceType(),

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CacheKeyMapper.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source;
1717

18-
public interface CacheKeyMapper<R> {
18+
import io.javaoperatorsdk.operator.processing.dependent.ExternalDependentIDProvider;
1919

20-
String keyFor(R resource);
20+
public interface CacheKeyMapper<R, ID> {
21+
22+
ID keyFor(R resource);
2123

2224
/**
2325
* Used if a polling event source handles only single secondary resource. See also docs for:
@@ -26,7 +28,19 @@ public interface CacheKeyMapper<R> {
2628
* @return static id mapper, all resources are mapped for same id.
2729
* @param <T> secondary resource type
2830
*/
29-
static <T> CacheKeyMapper<T> singleResourceCacheKeyMapper() {
31+
static <T> CacheKeyMapper<T, String> singleResourceCacheKeyMapper() {
3032
return r -> "id";
3133
}
34+
35+
static <T, ID> CacheKeyMapper<T, ID> externalIdProviderMapper() {
36+
37+
return r -> {
38+
if (r instanceof ExternalDependentIDProvider externalDependentIDProvider) {
39+
return (ID) externalDependentIDProvider.externalResourceId();
40+
} else {
41+
throw new IllegalStateException(
42+
"Resource does not implement ExternalDependentIDProvider: " + r.getClass());
43+
}
44+
};
45+
}
3246
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,23 @@
5454
* @param <R> type of polled external secondary resource
5555
* @param <P> primary resource
5656
*/
57-
public abstract class ExternalResourceCachingEventSource<R, P extends HasMetadata>
57+
public abstract class ExternalResourceCachingEventSource<R, P extends HasMetadata, ID>
5858
extends AbstractEventSource<R, P> implements RecentOperationCacheFiller<R> {
5959

6060
private static final Logger log =
6161
LoggerFactory.getLogger(ExternalResourceCachingEventSource.class);
6262

63-
protected final CacheKeyMapper<R> cacheKeyMapper;
63+
protected final CacheKeyMapper<R, ID> cacheKeyMapper;
6464

65-
protected Map<ResourceID, Map<String, R>> cache = new ConcurrentHashMap<>();
65+
protected Map<ResourceID, Map<ID, R>> cache = new ConcurrentHashMap<>();
6666

6767
protected ExternalResourceCachingEventSource(
68-
Class<R> resourceClass, CacheKeyMapper<R> cacheKeyMapper) {
68+
Class<R> resourceClass, CacheKeyMapper<R, ID> cacheKeyMapper) {
6969
this(null, resourceClass, cacheKeyMapper);
7070
}
7171

7272
protected ExternalResourceCachingEventSource(
73-
String name, Class<R> resourceClass, CacheKeyMapper<R> cacheKeyMapper) {
73+
String name, Class<R> resourceClass, CacheKeyMapper<R, ID> cacheKeyMapper) {
7474
super(resourceClass, name);
7575
this.cacheKeyMapper = cacheKeyMapper;
7676
}
@@ -91,7 +91,7 @@ protected synchronized void handleDelete(ResourceID primaryID, R resource) {
9191
handleDelete(primaryID, Set.of(cacheKeyMapper.keyFor(resource)));
9292
}
9393

94-
protected synchronized void handleDelete(ResourceID primaryID, Set<String> resourceIDs) {
94+
protected synchronized void handleDelete(ResourceID primaryID, Set<ID> resourceIDs) {
9595
if (!isRunning()) {
9696
return;
9797
}
@@ -146,8 +146,7 @@ && acceptedByFiler(cachedResources, newResourcesMap)) {
146146
}
147147
}
148148

149-
private boolean acceptedByFiler(
150-
Map<String, R> cachedResourceMap, Map<String, R> newResourcesMap) {
149+
private boolean acceptedByFiler(Map<ID, R> cachedResourceMap, Map<ID, R> newResourcesMap) {
151150

152151
var addedResources = new HashMap<>(newResourcesMap);
153152
addedResources.keySet().removeAll(cachedResourceMap.keySet());
@@ -175,7 +174,7 @@ private boolean acceptedByFiler(
175174
return true;
176175
}
177176

178-
Map<String, R> possibleUpdatedResources = new HashMap<>(cachedResourceMap);
177+
Map<ID, R> possibleUpdatedResources = new HashMap<>(cachedResourceMap);
179178
possibleUpdatedResources.keySet().retainAll(newResourcesMap.keySet());
180179
possibleUpdatedResources =
181180
possibleUpdatedResources.entrySet().stream()
@@ -248,7 +247,7 @@ public Optional<R> getSecondaryResource(ResourceID primaryID) {
248247
}
249248
}
250249

251-
public Map<ResourceID, Map<String, R>> getCache() {
250+
public Map<ResourceID, Map<ID, R>> getCache() {
252251
return Collections.unmodifiableMap(cache);
253252
}
254253

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@
2626
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
2727
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
2828

29-
public class CachingInboundEventSource<R, P extends HasMetadata>
30-
extends ExternalResourceCachingEventSource<R, P> implements ResourceEventAware<P> {
29+
public class CachingInboundEventSource<R, P extends HasMetadata, ID>
30+
extends ExternalResourceCachingEventSource<R, P, ID> implements ResourceEventAware<P> {
3131

3232
private final ResourceFetcher<R, P> resourceFetcher;
3333
private final Set<ResourceID> fetchedForPrimaries = ConcurrentHashMap.newKeySet();
3434

3535
public CachingInboundEventSource(
3636
ResourceFetcher<R, P> resourceFetcher,
3737
Class<R> resourceClass,
38-
CacheKeyMapper<R> cacheKeyMapper) {
38+
CacheKeyMapper<R, ID> cacheKeyMapper) {
3939
super(resourceClass, cacheKeyMapper);
4040
this.resourceFetcher = resourceFetcher;
4141
}
@@ -48,7 +48,7 @@ public void handleResourceEvent(ResourceID primaryID, R resource) {
4848
super.handleResources(primaryID, resource);
4949
}
5050

51-
public void handleResourceDeleteEvent(ResourceID primaryID, String resourceID) {
51+
public void handleResourceDeleteEvent(ResourceID primaryID, ID resourceID) {
5252
super.handleDelete(primaryID, Set.of(resourceID));
5353
}
5454

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfiguration.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import io.fabric8.kubernetes.api.model.HasMetadata;
2525
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
2626

27-
public record PerResourcePollingConfiguration<R, P extends HasMetadata>(
27+
public record PerResourcePollingConfiguration<R, P extends HasMetadata, ID>(
2828
String name,
2929
ScheduledExecutorService executorService,
30-
CacheKeyMapper<R> cacheKeyMapper,
30+
CacheKeyMapper<R, ID> cacheKeyMapper,
3131
PerResourcePollingEventSource.ResourceFetcher<R, P> resourceFetcher,
3232
Predicate<P> registerPredicate,
3333
Duration defaultPollingPeriod) {
@@ -37,7 +37,7 @@ public record PerResourcePollingConfiguration<R, P extends HasMetadata>(
3737
public PerResourcePollingConfiguration(
3838
String name,
3939
ScheduledExecutorService executorService,
40-
CacheKeyMapper<R> cacheKeyMapper,
40+
CacheKeyMapper<R, ID> cacheKeyMapper,
4141
PerResourcePollingEventSource.ResourceFetcher<R, P> resourceFetcher,
4242
Predicate<P> registerPredicate,
4343
Duration defaultPollingPeriod) {
@@ -47,7 +47,7 @@ public PerResourcePollingConfiguration(
4747
? new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER)
4848
: executorService;
4949
this.cacheKeyMapper =
50-
cacheKeyMapper == null ? CacheKeyMapper.singleResourceCacheKeyMapper() : cacheKeyMapper;
50+
cacheKeyMapper == null ? CacheKeyMapper.externalIdProviderMapper() : cacheKeyMapper;
5151
this.resourceFetcher = Objects.requireNonNull(resourceFetcher);
5252
this.registerPredicate = registerPredicate;
5353
this.defaultPollingPeriod = defaultPollingPeriod;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfigurationBuilder.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@
2222
import io.fabric8.kubernetes.api.model.HasMetadata;
2323
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
2424

25-
public final class PerResourcePollingConfigurationBuilder<R, P extends HasMetadata> {
25+
public final class PerResourcePollingConfigurationBuilder<R, P extends HasMetadata, ID> {
2626

2727
private final Duration defaultPollingPeriod;
2828
private final PerResourcePollingEventSource.ResourceFetcher<R, P> resourceFetcher;
2929

3030
private String name;
3131
private Predicate<P> registerPredicate;
3232
private ScheduledExecutorService executorService;
33-
private CacheKeyMapper<R> cacheKeyMapper;
33+
private CacheKeyMapper<R, ID> cacheKeyMapper;
3434

3535
public PerResourcePollingConfigurationBuilder(
3636
PerResourcePollingEventSource.ResourceFetcher<R, P> resourceFetcher,
@@ -40,30 +40,30 @@ public PerResourcePollingConfigurationBuilder(
4040
}
4141

4242
@SuppressWarnings("unused")
43-
public PerResourcePollingConfigurationBuilder<R, P> withExecutorService(
43+
public PerResourcePollingConfigurationBuilder<R, P, ID> withExecutorService(
4444
ScheduledExecutorService executorService) {
4545
this.executorService = executorService;
4646
return this;
4747
}
4848

49-
public PerResourcePollingConfigurationBuilder<R, P> withRegisterPredicate(
49+
public PerResourcePollingConfigurationBuilder<R, P, ID> withRegisterPredicate(
5050
Predicate<P> registerPredicate) {
5151
this.registerPredicate = registerPredicate;
5252
return this;
5353
}
5454

55-
public PerResourcePollingConfigurationBuilder<R, P> withCacheKeyMapper(
56-
CacheKeyMapper<R> cacheKeyMapper) {
55+
public PerResourcePollingConfigurationBuilder<R, P, ID> withCacheKeyMapper(
56+
CacheKeyMapper<R, ID> cacheKeyMapper) {
5757
this.cacheKeyMapper = cacheKeyMapper;
5858
return this;
5959
}
6060

61-
public PerResourcePollingConfigurationBuilder<R, P> withName(String name) {
61+
public PerResourcePollingConfigurationBuilder<R, P, ID> withName(String name) {
6262
this.name = name;
6363
return this;
6464
}
6565

66-
public PerResourcePollingConfiguration<R, P> build() {
66+
public PerResourcePollingConfiguration<R, P, ID> build() {
6767
return new PerResourcePollingConfiguration<>(
6868
name,
6969
executorService,

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@
4848
* @param <R> the resource polled by the event source
4949
* @param <P> related custom resource
5050
*/
51-
public class PerResourcePollingEventSource<R, P extends HasMetadata>
52-
extends ExternalResourceCachingEventSource<R, P> implements ResourceEventAware<P> {
51+
public class PerResourcePollingEventSource<R, P extends HasMetadata, ID>
52+
extends ExternalResourceCachingEventSource<R, P, ID> implements ResourceEventAware<P> {
5353

5454
private static final Logger log = LoggerFactory.getLogger(PerResourcePollingEventSource.class);
5555

@@ -65,7 +65,7 @@ public class PerResourcePollingEventSource<R, P extends HasMetadata>
6565
public PerResourcePollingEventSource(
6666
Class<R> resourceClass,
6767
EventSourceContext<P> context,
68-
PerResourcePollingConfiguration<R, P> config) {
68+
PerResourcePollingConfiguration<R, P, ID> config) {
6969
super(config.name(), resourceClass, config.cacheKeyMapper());
7070
this.primaryResourceCache = context.getPrimaryCache();
7171
this.resourceFetcher = config.resourceFetcher();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingConfiguration.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@
2020

2121
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
2222

23-
public record PollingConfiguration<R>(
23+
public record PollingConfiguration<R, ID>(
2424
String name,
2525
PollingEventSource.GenericResourceFetcher<R> genericResourceFetcher,
2626
Duration period,
27-
CacheKeyMapper<R> cacheKeyMapper) {
27+
CacheKeyMapper<R, ID> cacheKeyMapper) {
2828

2929
public PollingConfiguration(
3030
String name,
3131
PollingEventSource.GenericResourceFetcher<R> genericResourceFetcher,
3232
Duration period,
33-
CacheKeyMapper<R> cacheKeyMapper) {
33+
CacheKeyMapper<R, ID> cacheKeyMapper) {
3434
this.name = name;
3535
this.genericResourceFetcher = Objects.requireNonNull(genericResourceFetcher);
3636
this.period = period;
3737
this.cacheKeyMapper =
38-
cacheKeyMapper == null ? CacheKeyMapper.singleResourceCacheKeyMapper() : cacheKeyMapper;
38+
cacheKeyMapper == null ? CacheKeyMapper.externalIdProviderMapper() : cacheKeyMapper;
3939
}
4040
}

0 commit comments

Comments
 (0)