Skip to content

Commit f45940d

Browse files
committed
improve: ResourceIDMapper and ResourceIDProvider for external resources
Will check if this could be elegantly extended to bulk resources Signed-off-by: Attila Mészáros <[email protected]>
1 parent 234ac29 commit f45940d

File tree

18 files changed

+83
-90
lines changed

18 files changed

+83
-90
lines changed
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.javaoperatorsdk.operator.processing.event.source;
16+
package io.javaoperatorsdk.operator.processing;
1717

18-
import io.javaoperatorsdk.operator.processing.ResourceIDProvider;
18+
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
1919

20-
public interface CacheKeyMapper<R, ID> {
20+
public interface ResourceIDMapper<R, ID> {
2121

22-
ID keyFor(R resource);
22+
ID idFor(R resource);
2323

2424
/**
2525
* Used if a polling event source handles only single secondary resource and the id is String. See
@@ -28,11 +28,11 @@ public interface CacheKeyMapper<R, ID> {
2828
* @return static id mapper, all resources are mapped for same id.
2929
* @param <T> secondary resource type
3030
*/
31-
static <T> CacheKeyMapper<T, String> singleResourceCacheKeyMapper() {
31+
static <T> ResourceIDMapper<T, String> singleResourceCacheKeyMapper() {
3232
return r -> "id";
3333
}
3434

35-
static <T, ID> CacheKeyMapper<T, ID> resourceIdProviderMapper() {
35+
static <T, ID> ResourceIDMapper<T, ID> resourceIdProviderMapper() {
3636
return r -> {
3737
if (r instanceof ResourceIDProvider resourceIDProvider) {
3838
return (ID) resourceIDProvider.resourceId();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ResourceIDProvider.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
* Provides the identifier for an object that represents resource. This ID is used to select target
2020
* external resource for a dependent resource from the resources returned by `{@link
2121
* io.javaoperatorsdk.operator.api.reconciler.Context#getSecondaryResources(Class)}`. But also for
22-
* {@link io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper} for event sources in
23-
* external resources
22+
* {@link ResourceIDMapper} for event sources in external resources
2423
*
2524
* @param <T>
2625
*/

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractExternalDependentResource.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,22 @@
2222
import io.fabric8.kubernetes.api.model.HasMetadata;
2323
import io.javaoperatorsdk.operator.api.reconciler.Context;
2424
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
25-
import io.javaoperatorsdk.operator.processing.ResourceIDProvider;
25+
import io.javaoperatorsdk.operator.processing.ResourceIDMapper;
2626
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
2727
import io.javaoperatorsdk.operator.processing.event.ResourceID;
2828
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
2929
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
3030

3131
public abstract class AbstractExternalDependentResource<
32-
R, P extends HasMetadata, T extends EventSource<R, P>>
32+
R, P extends HasMetadata, T extends EventSource<R, P>, ID>
3333
extends AbstractEventSourceHolderDependentResource<R, P, T> {
3434

3535
private final boolean isDependentResourceWithExplicitState =
3636
this instanceof DependentResourceWithExplicitState;
3737
private final boolean isBulkDependentResource = this instanceof BulkDependentResource;
3838

39+
protected ResourceIDMapper<R, ID> resourceIDMapper = ResourceIDMapper.resourceIdProviderMapper();
40+
3941
@SuppressWarnings("rawtypes")
4042
private DependentResourceWithExplicitState dependentResourceWithExplicitState;
4143

@@ -132,21 +134,23 @@ protected Optional<R> selectTargetSecondaryResource(
132134
Set<R> secondaryResources, P primary, Context<P> context) {
133135
R desired = desired(primary, context);
134136
List<R> targetResources;
135-
if (desired instanceof ResourceIDProvider<?> desiredWithId) {
136-
targetResources =
137-
secondaryResources.stream()
138-
.filter(
139-
r -> ((ResourceIDProvider<?>) r).resourceId().equals(desiredWithId.resourceId()))
140-
.toList();
141-
} else {
142-
throw new IllegalStateException(
143-
"Either implement ExternalDependentIDProvider or override this "
144-
+ " (selectTargetSecondaryResource) method.");
145-
}
137+
var desiredID = resourceIDMapper.idFor(desired);
138+
targetResources =
139+
secondaryResources.stream()
140+
.filter(r -> resourceIDMapper.idFor(r).equals(desiredID))
141+
.toList();
146142
if (targetResources.size() > 1) {
147143
throw new IllegalStateException(
148144
"More than one secondary resource related to primary: " + targetResources);
149145
}
150146
return targetResources.isEmpty() ? Optional.empty() : Optional.of(targetResources.get(0));
151147
}
148+
149+
public ResourceIDMapper<R, ID> getResourceIDMapper() {
150+
return resourceIDMapper;
151+
}
152+
153+
public void setResourceIDMapper(ResourceIDMapper<R, ID> resourceIDMapper) {
154+
this.resourceIDMapper = resourceIDMapper;
155+
}
152156
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
import io.fabric8.kubernetes.api.model.HasMetadata;
2121
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
2222
import io.javaoperatorsdk.operator.processing.dependent.AbstractExternalDependentResource;
23-
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
2423
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
2524

2625
@Ignore
2726
public abstract class AbstractPollingDependentResource<R, P extends HasMetadata, ID>
28-
extends AbstractExternalDependentResource<R, P, ExternalResourceCachingEventSource<R, P, ID>>
29-
implements CacheKeyMapper<R, ID> {
27+
extends AbstractExternalDependentResource<
28+
R, P, ExternalResourceCachingEventSource<R, P, ID>, ID> {
3029

3130
public static final Duration DEFAULT_POLLING_PERIOD = Duration.ofMillis(5000);
3231
private Duration pollingPeriod;
@@ -49,10 +48,4 @@ public void setPollingPeriod(Duration pollingPeriod) {
4948
public Duration getPollingPeriod() {
5049
return pollingPeriod;
5150
}
52-
53-
// for now dependent resources support event sources only with one owned resource.
54-
@Override
55-
public ID keyFor(R resource) {
56-
return CacheKeyMapper.<R, ID>resourceIdProviderMapper().keyFor(resource);
57-
}
5851
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ protected ExternalResourceCachingEventSource<R, P, ID> createEventSource(
4747
resourceType(),
4848
context,
4949
new PerResourcePollingConfigurationBuilder<R, P, ID>(this, getPollingPeriod())
50-
.withCacheKeyMapper(this)
50+
.withResourceIDMapper(getResourceIDMapper())
5151
.withName(name())
5252
.build());
5353
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.fabric8.kubernetes.api.model.HasMetadata;
2121
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
2222
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
23-
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
23+
import io.javaoperatorsdk.operator.processing.ResourceIDMapper;
2424
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
2525
import io.javaoperatorsdk.operator.processing.event.source.polling.PollingConfiguration;
2626
import io.javaoperatorsdk.operator.processing.event.source.polling.PollingEventSource;
@@ -30,24 +30,24 @@ public abstract class PollingDependentResource<R, P extends HasMetadata, ID>
3030
extends AbstractPollingDependentResource<R, P, ID>
3131
implements PollingEventSource.GenericResourceFetcher<R> {
3232

33-
private final CacheKeyMapper<R, ID> cacheKeyMapper;
33+
private final ResourceIDMapper<R, ID> resourceIDMapper;
3434

35-
public PollingDependentResource(Class<R> resourceType, CacheKeyMapper<R, ID> cacheKeyMapper) {
35+
public PollingDependentResource(Class<R> resourceType, ResourceIDMapper<R, ID> resourceIDMapper) {
3636
super(resourceType);
37-
this.cacheKeyMapper = cacheKeyMapper;
37+
this.resourceIDMapper = resourceIDMapper;
3838
}
3939

4040
public PollingDependentResource(
41-
Class<R> resourceType, Duration pollingPeriod, CacheKeyMapper<R, ID> cacheKeyMapper) {
41+
Class<R> resourceType, Duration pollingPeriod, ResourceIDMapper<R, ID> resourceIDMapper) {
4242
super(resourceType, pollingPeriod);
43-
this.cacheKeyMapper = cacheKeyMapper;
43+
this.resourceIDMapper = resourceIDMapper;
4444
}
4545

4646
@Override
4747
protected ExternalResourceCachingEventSource<R, P, ID> createEventSource(
4848
EventSourceContext<P> context) {
4949
return new PollingEventSource<>(
5050
resourceType(),
51-
new PollingConfiguration<>(name(), this, getPollingPeriod(), cacheKeyMapper));
51+
new PollingConfiguration<>(name(), this, getPollingPeriod(), resourceIDMapper));
5252
}
5353
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import io.fabric8.kubernetes.api.model.HasMetadata;
3434
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
35+
import io.javaoperatorsdk.operator.processing.ResourceIDMapper;
3536
import io.javaoperatorsdk.operator.processing.ResourceIDProvider;
3637
import io.javaoperatorsdk.operator.processing.event.Event;
3738
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -61,24 +62,24 @@ public abstract class ExternalResourceCachingEventSource<R, P extends HasMetadat
6162
private static final Logger log =
6263
LoggerFactory.getLogger(ExternalResourceCachingEventSource.class);
6364

64-
protected final CacheKeyMapper<R, ID> cacheKeyMapper;
65+
protected final ResourceIDMapper<R, ID> resourceIDMapper;
6566

6667
protected Map<ResourceID, Map<ID, R>> cache = new ConcurrentHashMap<>();
6768

6869
protected ExternalResourceCachingEventSource(
69-
Class<R> resourceClass, CacheKeyMapper<R, ID> cacheKeyMapper) {
70-
this(null, resourceClass, cacheKeyMapper);
70+
Class<R> resourceClass, ResourceIDMapper<R, ID> resourceIDMapper) {
71+
this(null, resourceClass, resourceIDMapper);
7172
}
7273

7374
protected ExternalResourceCachingEventSource(
74-
String name, Class<R> resourceClass, CacheKeyMapper<R, ID> cacheKeyMapper) {
75+
String name, Class<R> resourceClass, ResourceIDMapper<R, ID> resourceIDMapper) {
7576
super(resourceClass, name);
76-
if (cacheKeyMapper == CacheKeyMapper.resourceIdProviderMapper()
77+
if (resourceIDMapper == ResourceIDMapper.resourceIdProviderMapper()
7778
&& !ResourceIDProvider.class.isAssignableFrom(resourceClass)) {
7879
throw new IllegalArgumentException(
7980
"resource class is not a " + ResourceIDProvider.class.getSimpleName());
8081
}
81-
this.cacheKeyMapper = cacheKeyMapper;
82+
this.resourceIDMapper = resourceIDMapper;
8283
}
8384

8485
protected synchronized void handleDelete(ResourceID primaryID) {
@@ -90,11 +91,11 @@ protected synchronized void handleDelete(ResourceID primaryID) {
9091

9192
protected synchronized void handleDeletes(ResourceID primaryID, Set<R> resource) {
9293
handleDelete(
93-
primaryID, resource.stream().map(cacheKeyMapper::keyFor).collect(Collectors.toSet()));
94+
primaryID, resource.stream().map(resourceIDMapper::idFor).collect(Collectors.toSet()));
9495
}
9596

9697
protected synchronized void handleDelete(ResourceID primaryID, R resource) {
97-
handleDelete(primaryID, Set.of(cacheKeyMapper.keyFor(resource)));
98+
handleDelete(primaryID, Set.of(resourceIDMapper.idFor(resource)));
9899
}
99100

100101
protected synchronized void handleDelete(ResourceID primaryID, Set<ID> resourceIDs) {
@@ -143,7 +144,7 @@ protected synchronized void handleResources(
143144
cachedResources = Collections.emptyMap();
144145
}
145146
var newResourcesMap =
146-
newResources.stream().collect(Collectors.toMap(cacheKeyMapper::keyFor, r -> r));
147+
newResources.stream().collect(Collectors.toMap(resourceIDMapper::idFor, r -> r));
147148
cache.put(primaryID, newResourcesMap);
148149
if (propagateEvent
149150
&& !newResourcesMap.equals(cachedResources)
@@ -205,7 +206,7 @@ private boolean acceptedByGenericFiler(R resource) {
205206
@Override
206207
public synchronized void handleRecentResourceCreate(ResourceID primaryID, R resource) {
207208
var actualValues = cache.get(primaryID);
208-
var resourceId = cacheKeyMapper.keyFor(resource);
209+
var resourceId = resourceIDMapper.idFor(resource);
209210
if (actualValues == null) {
210211
actualValues = new HashMap<>();
211212
cache.put(primaryID, actualValues);
@@ -220,7 +221,7 @@ public synchronized void handleRecentResourceUpdate(
220221
ResourceID primaryID, R resource, R previousVersionOfResource) {
221222
var actualValues = cache.get(primaryID);
222223
if (actualValues != null) {
223-
var resourceId = cacheKeyMapper.keyFor(resource);
224+
var resourceId = resourceIDMapper.idFor(resource);
224225
R actualResource = actualValues.get(resourceId);
225226
if (actualResource.equals(previousVersionOfResource)) {
226227
actualValues.put(resourceId, resource);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import java.util.concurrent.ConcurrentHashMap;
2222

2323
import io.fabric8.kubernetes.api.model.HasMetadata;
24+
import io.javaoperatorsdk.operator.processing.ResourceIDMapper;
2425
import io.javaoperatorsdk.operator.processing.event.ResourceID;
25-
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
2626
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
2727
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
2828

@@ -35,8 +35,8 @@ public class CachingInboundEventSource<R, P extends HasMetadata, ID>
3535
public CachingInboundEventSource(
3636
ResourceFetcher<R, P> resourceFetcher,
3737
Class<R> resourceClass,
38-
CacheKeyMapper<R, ID> cacheKeyMapper) {
39-
super(resourceClass, cacheKeyMapper);
38+
ResourceIDMapper<R, ID> resourceIDMapper) {
39+
super(resourceClass, resourceIDMapper);
4040
this.resourceFetcher = resourceFetcher;
4141
}
4242

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfiguration.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import java.util.function.Predicate;
2323

2424
import io.fabric8.kubernetes.api.model.HasMetadata;
25-
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
25+
import io.javaoperatorsdk.operator.processing.ResourceIDMapper;
2626

2727
public record PerResourcePollingConfiguration<R, P extends HasMetadata, ID>(
2828
String name,
2929
ScheduledExecutorService executorService,
30-
CacheKeyMapper<R, ID> cacheKeyMapper,
30+
ResourceIDMapper<R, ID> resourceIDMapper,
3131
PerResourcePollingEventSource.ResourceFetcher<R, P> resourceFetcher,
3232
Predicate<P> registerPredicate,
3333
Duration defaultPollingPeriod) {
@@ -37,7 +37,7 @@ public record PerResourcePollingConfiguration<R, P extends HasMetadata, ID>(
3737
public PerResourcePollingConfiguration(
3838
String name,
3939
ScheduledExecutorService executorService,
40-
CacheKeyMapper<R, ID> cacheKeyMapper,
40+
ResourceIDMapper<R, ID> resourceIDMapper,
4141
PerResourcePollingEventSource.ResourceFetcher<R, P> resourceFetcher,
4242
Predicate<P> registerPredicate,
4343
Duration defaultPollingPeriod) {
@@ -46,8 +46,8 @@ public PerResourcePollingConfiguration(
4646
executorService == null
4747
? new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER)
4848
: executorService;
49-
this.cacheKeyMapper =
50-
cacheKeyMapper == null ? CacheKeyMapper.resourceIdProviderMapper() : cacheKeyMapper;
49+
this.resourceIDMapper =
50+
resourceIDMapper == null ? ResourceIDMapper.resourceIdProviderMapper() : resourceIDMapper;
5151
this.resourceFetcher = Objects.requireNonNull(resourceFetcher);
5252
this.registerPredicate = registerPredicate;
5353
this.defaultPollingPeriod = defaultPollingPeriod;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfigurationBuilder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.function.Predicate;
2121

2222
import io.fabric8.kubernetes.api.model.HasMetadata;
23-
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
23+
import io.javaoperatorsdk.operator.processing.ResourceIDMapper;
2424

2525
public final class PerResourcePollingConfigurationBuilder<R, P extends HasMetadata, ID> {
2626

@@ -30,7 +30,7 @@ public final class PerResourcePollingConfigurationBuilder<R, P extends HasMetada
3030
private String name;
3131
private Predicate<P> registerPredicate;
3232
private ScheduledExecutorService executorService;
33-
private CacheKeyMapper<R, ID> cacheKeyMapper;
33+
private ResourceIDMapper<R, ID> resourceIDMapper;
3434

3535
public PerResourcePollingConfigurationBuilder(
3636
PerResourcePollingEventSource.ResourceFetcher<R, P> resourceFetcher,
@@ -52,9 +52,9 @@ public PerResourcePollingConfigurationBuilder<R, P, ID> withRegisterPredicate(
5252
return this;
5353
}
5454

55-
public PerResourcePollingConfigurationBuilder<R, P, ID> withCacheKeyMapper(
56-
CacheKeyMapper<R, ID> cacheKeyMapper) {
57-
this.cacheKeyMapper = cacheKeyMapper;
55+
public PerResourcePollingConfigurationBuilder<R, P, ID> withResourceIDMapper(
56+
ResourceIDMapper<R, ID> resourceIDMapper) {
57+
this.resourceIDMapper = resourceIDMapper;
5858
return this;
5959
}
6060

@@ -67,7 +67,7 @@ public PerResourcePollingConfiguration<R, P, ID> build() {
6767
return new PerResourcePollingConfiguration<>(
6868
name,
6969
executorService,
70-
cacheKeyMapper,
70+
resourceIDMapper,
7171
resourceFetcher,
7272
registerPredicate,
7373
defaultPollingPeriod);

0 commit comments

Comments
 (0)