Skip to content

Commit

Permalink
Remove stores declaration from operations, which was a duplicate of f…
Browse files Browse the repository at this point in the history
…unctions declaring state store names
  • Loading branch information
jeroenvandisseldorp committed Jun 27, 2024
1 parent 60971a5 commit f141750
Show file tree
Hide file tree
Showing 30 changed files with 97 additions and 119 deletions.
2 changes: 1 addition & 1 deletion docs/ksml-language-spec.json

Large diffs are not rendered by default.

121 changes: 65 additions & 56 deletions docs/ksml-language-spec.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ the predicate returns `true`, then the message will be sent to the output stream
| Stream Type | Returns | Parameter | Value Type | Required | Description |
|:-----------------|:-----------------|:----------|:-----------|:--------------------|:----------------------------------------------------------------------------------------------------|
| [KStream]`<K,V>` | [KStream]`<K,V>` | `if` | Yes | Inline or reference | A [Predicate] function, which returns `True` if the message can pass the filter, `False` otherwise. |
| | | | | | |
| [KTable]`<K,V>` | [KTable]`<K,V>` | `if` | Yes | Inline or reference | A [Predicate] function, which returns `True` if the message can pass the filter, `False` otherwise. |

Example:
Expand Down
3 changes: 2 additions & 1 deletion ksml/NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Lists of 71 third-party dependencies.
Lists of 72 third-party dependencies.
(MIT License) minimal-json (com.eclipsesource.minimal-json:minimal-json:0.9.5 - https://github.com/ralfstx/minimal-json)
(The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.17.1 - https://github.com/FasterXML/jackson)
(The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.17.1 - https://github.com/FasterXML/jackson-core)
Expand Down Expand Up @@ -34,6 +34,7 @@ Lists of 71 third-party dependencies.
(Apache License 2.0) Metrics Integration with JMX (io.dropwizard.metrics:metrics-jmx:4.2.25 - https://metrics.dropwizard.io/metrics-jmx)
(Apache License 2.0) swagger-annotations (io.swagger.core.v3:swagger-annotations:2.1.10 - https://github.com/swagger-api/swagger-core/modules/swagger-annotations)
(EDL 1.0) Jakarta Activation API (jakarta.activation:jakarta.activation-api:2.1.3 - https://github.com/jakartaee/jaf-api)
(Apache License 2.0) Jakarta Bean Validation API (jakarta.validation:jakarta.validation-api:3.0.2 - https://beanvalidation.org)
(Eclipse Distribution License - v 1.0) Jakarta SOAP with Attachments API (jakarta.xml.soap:jakarta.xml.soap-api:3.0.2 - https://github.com/jakartaee/saaj-api)
(Apache-2.0) Apache Avro (org.apache.avro:avro:1.11.3 - https://avro.apache.org)
(Apache-2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.26.2 - https://commons.apache.org/proper/commons-compress/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public StructsParser<PipelineDefinition> parser() {
// If no sink operation was specified, then we create an AS operation here with the name provided.
// This means that pipeline results can be referred to by other pipelines using the pipeline's name
// as identifier.
var sinkOperation = shortName != null ? new AsOperation(new OperationConfig(resources().getUniqueOperationName(longName), tags, null), shortName) : null;
var sinkOperation = shortName != null ? new AsOperation(new OperationConfig(resources().getUniqueOperationName(longName), tags), shortName) : null;
return new PipelineDefinition(name, from, via, sinkOperation);
});
}
Expand Down
1 change: 0 additions & 1 deletion ksml/src/main/java/io/axual/ksml/dsl/KSMLDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public static class Operations {
public static final String NAME_ATTRIBUTE = "name";
public static final String TYPE_ATTRIBUTE = "type";
public static final String STORE_ATTRIBUTE = "store";
public static final String STORE_NAMES_ATTRIBUTE = "stores";

public static final String AGGREGATE = "aggregate";
public static final String COGROUP = "cogroup";
Expand Down
10 changes: 0 additions & 10 deletions ksml/src/main/java/io/axual/ksml/operation/BaseOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public static String validateNameAndReturnError(String name) {

protected final String name;
protected final ContextTags tags;
protected final String[] storeNames;

public BaseOperation(OperationConfig config) {
var error = NameValidator.validateNameAndReturnError(config.name());
Expand All @@ -78,7 +77,6 @@ public BaseOperation(OperationConfig config) {
name = config.name();
}
tags = config.tags().append("operation-name", name);
storeNames = config.storeNames() != null ? config.storeNames() : new String[0];
}

@Override
Expand Down Expand Up @@ -287,14 +285,6 @@ protected void checkTuple(String faultDescription, DataType type, DataType... el
}
}

protected String[] combineStoreNames(String[]... storeNameArrays) {
final var storeNames = new TreeSet<String>();
for (String[] storeNameArray : storeNameArrays) {
if (storeNameArray != null) Collections.addAll(storeNames, storeNameArray);
}
return storeNames.toArray(TEMPLATE);
}

protected StreamDataType streamDataTypeOf(DataType dataType, boolean isKey) {
return streamDataTypeOf(new UserType(dataType), isKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var v = input.valueType();
final var pred = userFunctionOf(context, PREDICATE_NAME, predicate, new UserType(DataBoolean.DATATYPE), superOf(k), superOf(v));
final var userPred = new UserPredicate(pred, tags);
final var storeNames = combineStoreNames(this.storeNames, predicate.storeNames().toArray(TEMPLATE));
final var storeNames = predicate.storeNames().toArray(String[]::new);
final var supplier = new FixedKeyOperationProcessorSupplier<>(
name,
FilterNotProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var v = input.valueType();
final var pred = userFunctionOf(context, PREDICATE_NAME, predicate, new UserType(DataBoolean.DATATYPE), superOf(k), superOf(v));
final var userPred = new UserPredicate(pred, tags);
final var storeNames = combineStoreNames(this.storeNames, predicate.storeNames().toArray(TEMPLATE));
final var storeNames = predicate.storeNames().toArray(String[]::new);
final var supplier = new FixedKeyOperationProcessorSupplier<>(
name,
FilterProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,10 @@
public class OperationConfig {
private final String name;
private final ContextTags tags;
private final boolean allowStores;
private final String[] storeNames;

public OperationConfig(String name, ContextTags tags, String[] storeNames) {
public OperationConfig(String name, ContextTags tags) {
this.name = name;
this.tags = tags;
log.debug("Generated operation name: {}", this.name);
this.allowStores = storeNames != null;
this.storeNames = storeNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var v = input.valueType();
final var action = userFunctionOf(context, FOREACHACTION_NAME, forEachAction, equalTo(DataNull.DATATYPE), superOf(k), superOf(v));
final var userAction = new UserForeachAction(action, tags);
final var storeNames = combineStoreNames(this.storeNames, forEachAction.storeNames().toArray(TEMPLATE));
final var storeNames = forEachAction.storeNames().toArray(String[]::new);
final var supplier = new FixedKeyOperationProcessorSupplier<>(
name,
PeekProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
public class StoreOperationConfig extends OperationConfig {
public final StateStoreDefinition store;

public StoreOperationConfig(String name, ContextTags context, StateStoreDefinition store, List<String> storeNames) {
super(name, context, storeNames != null ? storeNames.toArray(new String[]{}) : null);
public StoreOperationConfig(String name, ContextTags context, StateStoreDefinition store) {
super(name, context);
this.store = store;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var kr = streamDataTypeOf(firstSpecificType(mapper, k), true);
final var map = userFunctionOf(context, MAPPER_NAME, mapper, kr, superOf(k), superOf(v));
final var userMap = new UserKeyTransformer(map, tags);
final var storeNames = combineStoreNames(this.storeNames, mapper.storeNames().toArray(TEMPLATE));
final var storeNames = mapper.storeNames().toArray(String[]::new);
final var supplier = new OperationProcessorSupplier<>(
name,
TransformKeyProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public BaseStreamWrapper apply(KStreamWrapper input, TopologyBuildContext contex
final var kr = streamDataTypeOf(userTupleType.getUserType(0), true);
final var vr = streamDataTypeOf(userTupleType.getUserType(1), false);
final var userMap = new UserKeyValueTransformer(map, tags);
final var storeNames = combineStoreNames(this.storeNames, mapper.storeNames().toArray(TEMPLATE));
final var storeNames = mapper.storeNames().toArray(String[]::new);
final var supplier = new OperationProcessorSupplier<>(
name,
TransformKeyValueProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var kr = streamDataTypeOf(mapperResultListTupleValueType.getUserType(0), true);
final var vr = streamDataTypeOf(mapperResultListTupleValueType.getUserType(1), false);
final var userMap = new UserKeyValueToKeyValueListTransformer(map, tags);
final var storeNames = combineStoreNames(this.storeNames, mapper.storeNames().toArray(TEMPLATE));
final var storeNames = mapper.storeNames().toArray(String[]::new);
final var supplier = new OperationProcessorSupplier<>(
name,
TransformKeyValueToKeyValueListProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var vr = streamDataTypeOf(firstSpecificType(mapper, new UserType(new ListType(DataType.UNKNOWN))), false);
final var map = userFunctionOf(context, MAPPER_NAME, mapper, subOf(vr), superOf(k), superOf(v));
final var userMap = new UserKeyValueToValueListTransformer(map, tags);
final var storeNames = combineStoreNames(this.storeNames, mapper.storeNames().toArray(TEMPLATE));
final var storeNames = mapper.storeNames().toArray(String[]::new);
final var supplier = new FixedKeyOperationProcessorSupplier<>(
name,
TransformKeyValueToValueListProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var meta = new UserType(RecordMetadata.DATATYPE);
final var map = userFunctionOf(context, MAPPER_NAME, mapper, subOf(meta), superOf(k), superOf(v), superOf(meta));
final var userMap = new UserMetadataTransformer(map, tags);
final var storeNames = combineStoreNames(this.storeNames, mapper.storeNames().toArray(TEMPLATE));
final var storeNames = mapper.storeNames().toArray(String[]::new);
final var supplier = new FixedKeyOperationProcessorSupplier<>(
name,
TransformMetadataProcessor::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
final var vr = streamDataTypeOf(firstSpecificType(mapper, v.userType()), false);
final var map = userFunctionOf(context, MAPPER_NAME, mapper, vr, superOf(k), superOf(v));
final var userMap = new UserValueTransformer(map, tags);
final var storeNames = combineStoreNames(this.storeNames, mapper.storeNames().toArray(TEMPLATE));
final var storeNames = mapper.storeNames().toArray(String[]::new);
final var supplier = new FixedKeyOperationProcessorSupplier<>(
name,
TransformValueProcessor::new,
Expand Down Expand Up @@ -89,6 +89,7 @@ public StreamWrapper apply(KTableWrapper input, TopologyBuildContext context) {
final ValueTransformerWithKeySupplier<Object, Object, DataObject> supplier = () -> userMap;
final var named = namedOf();
final var mat = materializedOf(context, kvStore);
final var storeNames = mapper.storeNames().toArray(String[]::new);
final KTable<Object, Object> output = named != null
? mat != null
? input.table.transformValues(supplier, mat, named, storeNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ public StructsParser<FilterNotOperation> parser() {
operationNameField(),
functionField(KSMLDSL.Operations.Filter.PREDICATE, "A function that returns \"false\" when records are accepted, \"true\" otherwise", new PredicateDefinitionParser(false)),
storeField(false, "Materialized view of the filtered table (only applies to tables, ignored for streams)", StoreType.KEYVALUE_STORE),
storeNamesField(),
(name, pred, store, stores, tags) -> {
(name, pred, store, tags) -> {
if (pred != null)
return new FilterNotOperation(storeOperationConfig(name, tags, store, stores), pred);
return new FilterNotOperation(storeOperationConfig(name, tags, store), pred);
throw new ExecutionException("Predicate not defined for " + type + " operation");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ public StructsParser<FilterOperation> parser() {
operationNameField(),
functionField(KSMLDSL.Operations.Filter.PREDICATE, "A function that returns \"true\" when records are accepted, \"false\" otherwise", new PredicateDefinitionParser(false)),
storeField(false, "Materialized view of the filtered table (only applies to tables, ignored for streams)", StoreType.KEYVALUE_STORE),
storeNamesField(),
(name, pred, store, stores, tags) -> {
(name, pred, store, tags) -> {
if (pred != null)
return new FilterOperation(storeOperationConfig(name, tags, store, stores), pred);
return new FilterOperation(storeOperationConfig(name, tags, store), pred);
throw new ExecutionException("Predicate not defined for " + type + " operation");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public StructsParser<ForEachOperation> parser() {
"Operation to call a function for every record in the stream",
operationNameField(),
functionField(KSMLDSL.Operations.FOR_EACH, "A function that gets called for every message in the stream", new ForEachActionDefinitionParser(false)),
storeNamesField(),
(name, action, stores, tags) -> action != null ? new ForEachOperation(operationConfig(name, tags, stores), action) : null);
(name, action, tags) -> action != null ? new ForEachOperation(operationConfig(name, tags), action) : null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ protected StructsParser<String> operationNameField() {
return optional(stringField(KSMLDSL.Operations.NAME_ATTRIBUTE, false, type, "The name of the operation processor"));
}

protected StructsParser<List<String>> storeNamesField() {
return optional(listField(KSMLDSL.Operations.STORE_NAMES_ATTRIBUTE, "store", "state store name", "The names of all state stores used by the function", new StringValueParser()));
}

protected OperationConfig operationConfig(String name, ContextTags tags) {
return operationConfig(name, tags, null);
}
Expand All @@ -60,8 +56,7 @@ protected OperationConfig operationConfig(String name, ContextTags tags, List<St
name = validateName("Operation", name, defaultLongName != null ? defaultLongName + "_" + type : type);
return new OperationConfig(
name != null ? resources().getUniqueOperationName(name) : resources().getUniqueOperationName(tags),
tags,
storeNames != null ? storeNames.toArray(new String[]{}) : null);
tags);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public StructsParser<PeekOperation> parser() {
"Operation to peek into a stream, without modifying the stream contents",
operationNameField(),
functionField(KSMLDSL.Operations.FOR_EACH, "A function that gets called for every message in the stream", new ForEachActionDefinitionParser(false)),
storeNamesField(),
(name, action, stores, tags) -> new PeekOperation(operationConfig(name, tags, stores), action));
(name, action, tags) -> new PeekOperation(operationConfig(name, tags), action));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ public StoreOperationParser(String type, TopologyResources resources) {
}

protected StoreOperationConfig storeOperationConfig(String name, ContextTags tags, StateStoreDefinition store) {
return storeOperationConfig(name, tags, store, null);
}

protected StoreOperationConfig storeOperationConfig(String name, ContextTags tags, StateStoreDefinition store, List<String> storeNames) {
name = validateName("Store", name, defaultShortName(), true);
return new StoreOperationConfig(name != null ? resources().getUniqueOperationName(name) : resources().getUniqueOperationName(tags), tags, store, storeNames);
return new StoreOperationConfig(name != null ? resources().getUniqueOperationName(name) : resources().getUniqueOperationName(tags), tags, store);
}

protected StructsParser<StateStoreDefinition> storeField(boolean required, String doc, StoreType expectedStoreType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ protected StructsParser<TransformKeyOperation> parser() {
"Convert the key of every record in the stream to another key",
operationNameField(),
functionField(KSMLDSL.Operations.Transform.MAPPER, "A function that computes a new key for each record", new KeyTransformerDefinitionParser(false)),
storeNamesField(),
(name, mapper, storeNames, tags) -> new TransformKeyOperation(operationConfig(name, tags, storeNames), mapper));
(name, mapper, tags) -> new TransformKeyOperation(operationConfig(name, tags), mapper));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ protected StructsParser<TransformKeyValueOperation> parser() {
"Convert the key/value of every record in the stream to another key/value",
operationNameField(),
functionField(KSMLDSL.Operations.Transform.MAPPER, "A function that computes a new key/value for each record", new KeyValueTransformerDefinitionParser(false)),
storeNamesField(),
(name, mapper, storeNames, tags) -> new TransformKeyValueOperation(operationConfig(name, tags, storeNames), mapper));
(name, mapper, tags) -> new TransformKeyValueOperation(operationConfig(name, tags), mapper));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ protected StructsParser<TransformKeyValueToKeyValueListOperation> parser() {
"Convert a stream by transforming every record into a list of derived records",
operationNameField(),
functionField(KSMLDSL.Operations.Transform.MAPPER, "A function that converts every record of a stream to a list of output records.", new KeyValueToKeyValueListTransformerDefinitionParser(false)),
storeNamesField(),
(name, mapper, storeNames, tags) -> new TransformKeyValueToKeyValueListOperation(operationConfig(name, tags, storeNames), mapper));
(name, mapper, tags) -> new TransformKeyValueToKeyValueListOperation(operationConfig(name, tags), mapper));
}
}
Loading

0 comments on commit f141750

Please sign in to comment.