Skip to content

Commit

Permalink
Small cleanups and parser fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandisseldorp committed Jun 22, 2024
1 parent 4a63508 commit 1eab536
Show file tree
Hide file tree
Showing 10 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion examples/ksml.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* =========================LICENSE_END==================================
*/

public record ToTopicDefinition(TopicDefinition topic, StreamPartitionerDefinition partitioner) implements Definition {
public record ToTopicDefinition(TopicDefinition topic, FunctionDefinition partitioner) implements Definition {
@Override
public String toString() {
return definitionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/

public record ToTopicNameExtractorDefinition(TopicNameExtractorDefinition topicNameExtractor,
StreamPartitionerDefinition partitioner) implements Definition {
FunctionDefinition partitioner) implements Definition {
@Override
public String toString() {
return definitionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import static io.axual.ksml.dsl.KSMLDSL.Functions;

public abstract class FunctionDefinitionParser<T extends FunctionDefinition> extends DefinitionParser<T> {
private String defaultName;

protected StructParser<T> parserWithStores(Class<T> resultClass, String functionType, String description, Constructor1<T, FunctionDefinition> constructor) {
return parser(resultClass, functionType, description, (name, type, params, globalCode, code, expression, resultType, stores, tags) -> FunctionDefinition.as(name, params, globalCode, code, expression, resultType, stores), constructor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public StructParser<GlobalTableDefinition> parser() {
final var valueField = userTypeField(KSMLDSL.Streams.VALUE_TYPE, "The value type of the global table");
if (isSource) return structParser(
GlobalTableDefinition.class,
"",
"Source",
DOC,
stringField(KSMLDSL.Streams.TOPIC, TOPIC_DOC),
keyField,
Expand All @@ -75,7 +75,7 @@ public StructParser<GlobalTableDefinition> parser() {
});
return structParser(
GlobalTableDefinition.class,
"Intermediate",
"",
DOC,
stringField(KSMLDSL.Streams.TOPIC, TOPIC_DOC),
optional(keyField),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public StructParser<StreamDefinition> parser() {
final var valueField = userTypeField(Streams.VALUE_TYPE, "The value type of the stream");
if (isSource) return structParser(
StreamDefinition.class,
"",
"Source",
DOC,
stringField(Streams.TOPIC, TOPIC_DOC),
keyField,
Expand All @@ -57,7 +57,7 @@ public StructParser<StreamDefinition> parser() {
});
return structParser(
StreamDefinition.class,
"Intermediate",
"",
DOC,
stringField(Streams.TOPIC, TOPIC_DOC),
optional(keyField),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public StructParser<TableDefinition> parser() {
final var valueField = userTypeField(Streams.VALUE_TYPE, "The value type of the table");
if (isSource) return structParser(
TableDefinition.class,
"",
"Source",
DOC,
stringField(Streams.TOPIC, TOPIC_DOC),
keyField,
Expand All @@ -75,7 +75,7 @@ public StructParser<TableDefinition> parser() {
});
return structParser(
TableDefinition.class,
"Intermediate",
"",
DOC,
stringField(Streams.TOPIC, TOPIC_DOC),
optional(keyField),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import io.axual.ksml.definition.ToTopicDefinition;
import io.axual.ksml.dsl.KSMLDSL;
import io.axual.ksml.generator.TopologyResources;
import io.axual.ksml.parser.TopologyResourceAwareParser;
import io.axual.ksml.parser.StructParser;
Expand All @@ -36,8 +37,8 @@ protected StructParser<ToTopicDefinition> parser() {
ToTopicDefinition.class,
"",
"Writes out pipeline messages to a topic",
new TopicDefinitionParser(resources(), false),
new StreamPartitionerDefinitionParser(),
optional(topicField(KSMLDSL.Operations.To.TOPIC, "A reference to a stream, table or globalTable, or an inline definition of the output topic", new TopicDefinitionParser(resources(), false))),
optional(functionField(KSMLDSL.Operations.To.PARTITIONER, "A function that partitions the records in the output topic", new StreamPartitionerDefinitionParser())),
(topic, partitioner, tags) -> topic != null ? new ToTopicDefinition(topic, partitioner) : null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected StructParser<ToTopicNameExtractorDefinition> parser() {
"Reference to a pre-defined topic name extractor, or an inline definition of a topic name extractor and an optional stream partitioner",
(name, tags) -> resources().function(name),
new TopicNameExtractorDefinitionParser())),
new StreamPartitionerDefinitionParser(),
optional(functionField(KSMLDSL.Operations.To.PARTITIONER, "A function that partitions the records in the output topic", new StreamPartitionerDefinitionParser())),
(tne, partitioner, tags) -> tne != null ? new ToTopicNameExtractorDefinition(new TopicNameExtractorDefinition(tne), partitioner) : null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public StructParser<TopicDefinition> parser() {
final var valueField = userTypeField(Streams.VALUE_TYPE, "The value type of the topic");
if (isSource) return structParser(
TopicDefinition.class,
"",
"Source",
DOC,
stringField(Streams.TOPIC, TOPIC_DOC),
keyField,
Expand All @@ -54,7 +54,7 @@ public StructParser<TopicDefinition> parser() {
(topic, keyType, valueType, tsExtractor, resetPolicy, tags) -> topic != null ? new TopicDefinition(topic, keyType, valueType, tsExtractor, OffsetResetPolicyParser.parseResetPolicy(resetPolicy)) : null);
return structParser(
TopicDefinition.class,
"Intermediate",
"",
DOC,
stringField(Streams.TOPIC, TOPIC_DOC),
optional(keyField),
Expand Down

0 comments on commit 1eab536

Please sign in to comment.