From 336a765c5adec988e4b6cb0e1d9c088dfbcc7bad Mon Sep 17 00:00:00 2001 From: jeroenvandisseldorp Date: Fri, 28 Jun 2024 00:23:50 +0200 Subject: [PATCH] Update metric documentation and several related subjects --- docs/functions.md | 42 ++++++ docs/index.md | 17 +-- docs/introduction.md | 127 ++++++++++++------ docs/notations.md | 56 +++++--- docs/pipelines.md | 6 +- docs/quick-start.md | 11 +- docs/release-notes.md | 87 +++++++----- docs/runners.md | 48 ++++--- docs/types.md | 1 + .../java/io/axual/ksml/runner/KsmlInfo.java | 4 +- .../runner/prometheus/PrometheusExport.java | 4 +- .../metric/{KSMLMetrics.java => Metrics.java} | 4 +- .../java/io/axual/ksml/python/Invoker.java | 8 +- .../io/axual/ksml/python/PythonContext.java | 6 +- .../axual/ksml/python/MetricsBridgeTest.java | 4 +- 15 files changed, 286 insertions(+), 139 deletions(-) rename ksml/src/main/java/io/axual/ksml/metric/{KSMLMetrics.java => Metrics.java} (95%) diff --git a/docs/functions.md b/docs/functions.md index 24c9d5d2..40d4e055 100644 --- a/docs/functions.md +++ b/docs/functions.md @@ -9,6 +9,7 @@ * [Function Types](#function-types) * [Function parameters](#function-parameters) * [Logger](#logger) + * [Metrics](#metrics) * [State stores](#state-stores) ## Introduction @@ -161,6 +162,47 @@ Output of the above statements looks like: [LOG TIMESTAMP] DEBUG function.name I'm printing five variables here: 1, 2, 3, text, {"json":"is cool"}. Lovely isn't it? ``` +### Metrics + +KSML supports metric collection and exposure through JMX and built-in Prometheus agent. Metrics for Python functions are +automatically generated and collected, but users can also specify their own metrics. For an example, +see `17-example-inspect-with-metrics.yaml` in the `examples` directory. + +KSML supports the following metric types: + +* Counter: an increasing integer, which counts for example the number of calls made to a Python function. +* Meter: used for periodically updating a measurement value. Preferred over Counter when don't care too much about exact + averages, but want to monitor trends instead. +* Timer: measures the time spent by processes or functions, that get called internally. + +Every Python function in KSML can use the `metrics` variable, which is made available by KSML. The object supports the +following methods to create your own metrics: + +* counter(name: str, tags: dict) -> Counter +* counter(name: str) -> Counter +* meter(name: str, tags: dict) -> Meter +* meter(name: str) -> Meter +* timer(name: str, tags: dict) -> Timer +* timer(name: str) -> Timer + +In turn these objects support the following: + +#### Counter + +* increment() +* increment(delta: int) + +#### Meter + +* mark() +* mark(nrOfEvents: int) + +#### Timer + +* updateSeconds(valueSeconds: int) +* updateMillis(valueMillis: int) +* updateNanos(valueNanos: int) + ### State stores Some functions are allowed to access local state stores. These functions specify the diff --git a/docs/index.md b/docs/index.md index 314a14fe..e052cdf9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -3,22 +3,23 @@ Welcome to the KSML documentation, Use the menu on the left to navigate through the various sections ## Quick Start + If you want to get going quickly, go to the KSML Quickstart. ## Introduction -KSML allows anyone to specify a powerful Kafka Streams application in just a few lines of YAML and Python snippets. +KSML allows anyone to specify a powerful Kafka Streams application in just a few lines of YAML and Python snippets. ## Contents 1. [Introduction](introduction.md) -1. [Stream Types](streams.md) -1. [Functions](functions.md) -1. [Pipelines](pipelines.md) -1. [Operations](operations.md) -1. [Data Types](types.md) -1. [Runners](runners.md) -1. [Language specification](ksml-language-spec) +2. [Stream Types](streams.md) +3. [Functions](functions.md) +4. [Pipelines](pipelines.md) +5. [Operations](operations.md) +6. [Data Types](types.md) +7. [Runners](runners.md) +8. [Language specification](ksml-language-spec.md) [Getting Started](quick-start) diff --git a/docs/introduction.md b/docs/introduction.md index e167c5f0..24d47c7d 100644 --- a/docs/introduction.md +++ b/docs/introduction.md @@ -1,28 +1,34 @@ # KSML: Kafka Streams for Low Code Environments ## Abstract -Kafka Streams has captured the hearts and minds of many developers that want to develop streaming applications on top of Kafka. But as powerful as the framework is, Kafka -Streams has had a hard time getting around the requirement of writing Java code and setting up build pipelines. There were some attempts to rebuild Kafka Streams, but up -until now popular languages like Python did not receive equally powerful (and maintained) stream processing frameworks. In this article we will present a new declarative -approach to unlock Kafka Streams, called KSML. By the time you finish reading this document, you will be able to write streaming applications yourself, using + +Kafka Streams has captured the hearts and minds of many developers that want to develop streaming applications on top of +Kafka. But as powerful as the framework is, Kafka +Streams has had a hard time getting around the requirement of writing Java code and setting up build pipelines. There +were some attempts to rebuild Kafka Streams, but up +until now popular languages like Python did not receive equally powerful (and maintained) stream processing frameworks. +In this article we will present a new declarative +approach to unlock Kafka Streams, called KSML. By the time you finish reading this document, you will be able to write +streaming applications yourself, using only a few simple basic rules and Python snippets. * [Setting up a test environment](#setting-up-a-test-environment) * [KSML in practice](#ksml-in-practice) - * [Example 1. Inspect data on a topic](#example-1-inspect-data-on-a-topic) - * [Example 2. Copying data to another topic](#example-2-copying-data-to-another-topic) - * [Example 3. Filtering data](#example-3-filtering-data) - * [Example 4. Branching messages](#example-4-branching-messages) - * [Example 5. Dynamic routing](#example-5-dynamic-routing) - * [Example 6. Multiple pipelines](#example-6-multiple-pipelines) - + * [Example 1. Inspect data on a topic](#example-1-inspect-data-on-a-topic) + * [Example 2. Copying data to another topic](#example-2-copying-data-to-another-topic) + * [Example 3. Filtering data](#example-3-filtering-data) + * [Example 4. Branching messages](#example-4-branching-messages) + * [Example 5. Dynamic routing](#example-5-dynamic-routing) + * [Example 6. Multiple pipelines](#example-6-multiple-pipelines) ## Setting up a test environment -To demonstrate KSML's capabilities, you will need a working Kafka cluster, or an Axual Platform/Cloud environment. Check out the [Runners](runners.md) page to configure KSML. +To demonstrate KSML's capabilities, you will need a working Kafka cluster, or an Axual Platform/Cloud environment. Check +out the [Runners](runners.md) page to configure KSML.
-We set up a test topic, called `ksml_sensordata_avro` with key/value types of `String`/`SensorData`. The [SensorData]({{ site.github.repository_url }}/tree/main/examples/SensorData.avsc) schema -was created for demo purposes only and contains several fields to demonstratie KSML capabilities: +We set up a test topic, called `ksml_sensordata_avro` with key/value types of `String`/`SensorData`. The [SensorData] +schema +was created for demo purposes only and contains several fields to demonstrate KSML capabilities: ```json { @@ -98,7 +104,8 @@ was created for demo purposes only and contains several fields to demonstratie K } ``` -For the rest of this document, we assume you have set up the `ksml_sensordata_avro` topic and populated it with some random data. +For the rest of this document, we assume you have set up the `ksml_sensordata_avro` topic and populated it with some +random data. So without any further delays, let's see how KSML allows us to process this data. @@ -133,13 +140,18 @@ pipelines: ``` -Let's disect this definition one element at a time. Before defining processing logic, we first define the streams used by the definition. In this case we define a stream named `sensor_source_avro` which reads from the topic `ksml_sensordata_avro`. The stream defines a `string` key and Avro `SensorData` values. +Let's analyze this definition one element at a time. Before defining processing logic, we first define the streams used +by the definition. In this case we define a stream named `sensor_source_avro` which reads from the +topic `ksml_sensordata_avro`. The stream defines a `string` key and Avro `SensorData` values. -Next is a list of functions that can be used by the processing logic. Here we define just one, `log_message`, which simply uses the provided logger to write the key, value and format of a message to the console. +Next is a list of functions that can be used by the processing logic. Here we define just one, `log_message`, which +simply uses the provided logger to write the key, value and format of a message to the console. -The third element `pipelines` defines the real processing logic. We define a pipeline called `consume_avro`, which takes messages from `ksml_sensordata_avro` and passes them to `print_message`. +The third element `pipelines` defines the real processing logic. We define a pipeline called `consume_avro`, which takes +messages from `ksml_sensordata_avro` and passes them to `print_message`. -The definition file is parsed by KSML and translated into a Kafka Streams topology, which is [described](https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/Topology.html#describe--) as follows: +The definition file is parsed by KSML and translated into a Kafka Streams topology, which +is [described](https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/Topology.html#describe--) as follows: ``` Topologies: @@ -162,11 +174,13 @@ And the output of the generated topology looks like this: 2024-03-06T18:31:59,412Z INFO ksml.functions.log_message Consumed AVRO message - key=sensor5, value={'city': 'Amsterdam', 'color': 'blue', 'name': 'sensor5', 'owner': 'Bob', 'timestamp': 1709749919409, 'type': 'LENGTH', 'unit': 'm', 'value': '658', '@type': 'SensorData', '@schema': { <>}} ``` -As you can see, the output of the application is exactly that what we defined it to be in the `log_message` function, namely a dump of all data found on the topic. +As you can see, the output of the application is exactly that what we defined it to be in the `log_message` function, +namely a dump of all data found on the topic. ### Example 2. Copying data to another topic -Now that we can see what data is on a topic, we will start to manipulate its routing. In this example we are copying unmodified data to a secondary topic: +Now that we can see what data is on a topic, we will start to manipulate its routing. In this example we are copying +unmodified data to a secondary topic: ```yaml streams: @@ -197,11 +211,17 @@ pipelines: to: sensor_copy ``` -You can see that we specified a second stream named `sensor_copy` in this example, which is backed by the topic `ksml_sensordata_copy` target topic. The `log_message` function is unchanged, but the pipeline did undergo some changes. Two new elements are introduced here, namely `via` and `to`. +You can see that we specified a second stream named `sensor_copy` in this example, which is backed by the +topic `ksml_sensordata_copy` target topic. The `log_message` function is unchanged, but the pipeline did undergo some +changes. Two new elements are introduced here, namely `via` and `to`. -The `via` tag allows users to define a series of operations executed on the data. In this case there is only one, namely a `peek` operation which does not modify any data, but simply outputs the data on stdout as a side-effect. +The `via` tag allows users to define a series of operations executed on the data. In this case there is only one, namely +a `peek` operation which does not modify any data, but simply outputs the data on stdout as a side effect. -The `to` operation is a so-called "sink operation". Sink operations are always last in a pipeline. Processing of the pipeline does not continue after it was delivered to a sink operation. Note that in the first example above `forEach` is also a sink operation, whereas in this example we achieve the same result by passing the `log_message` function as a parameter to the `peek` operation. +The `to` operation is a so-called "sink operation". Sink operations are always last in a pipeline. Processing of the +pipeline does not continue after it was delivered to a sink operation. Note that in the first example above `forEach` is +also a sink operation, whereas in this example we achieve the same result by passing the `log_message` function as a +parameter to the `peek` operation. When this definition is translated by KSML, the following Kafka Streams topology is created: @@ -222,7 +242,8 @@ The output is similar to that of example 1, but the same data can also be found ### Example 3. Filtering data -Now that we can read and write data, let's see if we can apply some logic to the processing as well. In this example we will be filtering data based on the contents of the value: +Now that we can read and write data, let's see if we can apply some logic to the processing as well. In this example we +will be filtering data based on the contents of the value: ```yaml # This example shows how to read from four simple streams and log all messages @@ -267,9 +288,13 @@ pipelines: to: sensor_filtered ``` -Again, first we define the streams and the functions involved in the processing. You can see we added a new function called `filter_message` which returns `true` or `false` based on the `color` field in the value of the message. This function is used below in the pipeline. +Again, first we define the streams and the functions involved in the processing. You can see we added a new function +called `filter_message` which returns `true` or `false` based on the `color` field in the value of the message. This +function is used below in the pipeline. -The pipeline is extended to include a `filter` operation, which takes a `predicate` function as parameter. That function is called for every input message. Only messages for which the function returns `true` are propagated. All other messages are discarded. +The pipeline is extended to include a `filter` operation, which takes a `predicate` function as parameter. That function +is called for every input message. Only messages for which the function returns `true` are propagated. All other +messages are discarded. Using this definition, KSML generates the following Kafka Streams topology: @@ -300,11 +325,14 @@ When it executes, we see the following output: 2024-03-06T18:45:12,008Z INFO ksml.functions.log_message Consumed AVRO message - key=sensor5, value={'city': 'Amsterdam', 'color': 'blue', 'name': 'sensor5', 'owner': 'Bob', 'timestamp': 1709749919409, 'type': 'LENGTH', 'unit': 'm', 'value': '658', '@type': 'SensorData', '@schema': { <>}} ``` -As you can see, the filter operation did its work. Only messages with field `color` set to `blue` are passed on to the `peek` operation, while other messages are discarded. +As you can see, the filter operation did its work. Only messages with field `color` set to `blue` are passed on to +the `peek` operation, while other messages are discarded. ### Example 4. Branching messages -Another way to filter messages is to use a `branch` operation. This is also a sink operation, which closes the processing of a pipeline. It is similar to `forEach` and `to` in that respect, but has a different definition and behaviour. +Another way to filter messages is to use a `branch` operation. This is also a sink operation, which closes the +processing of a pipeline. It is similar to `forEach` and `to` in that respect, but has a different definition and +behaviour. ```yaml streams: @@ -339,9 +367,15 @@ pipelines: code: log.warn("UNKNOWN COLOR - {}", value["color"]) ``` -The `branch` operation takes a list of branches as its parameters, which each specifies a processing pipeline of its own. Branches contain the keyword `if`, which take a predicate function that determines if a message will flow into that particular branch, or if it will be passed to the next branch(es). Every message will only end up in one branch, namely the first one in order where the `if` predcate function returns `true`. +The `branch` operation takes a list of branches as its parameters, which each specifies a processing pipeline of its +own. Branches contain the keyword `if`, which take a predicate function that determines if a message will flow into that +particular branch, or if it will be passed to the next branch(es). Every message will only end up in one branch, namely +the first one in order where the `if` predicate function returns `true`. -In the example we see that the first branch will be populated only with messages with `color` field set to `blue`. Once there, these messages will be written to `ksml_sensordata_blue`. The second branch will only contain messages with `color`=`red` and these messages will be written to `ksml_sensordata_red`. Finally, the last branch outputs a message that the color is unknown and ends any further processing. +In the example we see that the first branch will be populated only with messages with `color` field set to `blue`. Once +there, these messages will be written to `ksml_sensordata_blue`. The second branch will only contain messages +with `color`=`red` and these messages will be written to `ksml_sensordata_red`. Finally, the last branch outputs a +message that the color is unknown and ends any further processing. When translated by KSML the following Kafka Streams topology is set up: @@ -388,11 +422,15 @@ It is clear that the branch operation is integrated in this topology. Its output 2024-03-06T18:31:59,412Z INFO k.f.branch_pipelines_main_via_1_forEach SOURCE MESSAGE - key=sensor5, value={'city': 'Amsterdam', 'color': 'blue', 'name': 'sensor5', 'owner': 'Bob', 'timestamp': 1709749919409, 'type': 'LENGTH', 'unit': 'm', 'value': '658', '@type': 'SensorData', '@schema': { <>}} ``` -We see that every message processed by the pipeline is logged through the `k.f.branch_pipelines_main_via_1_forEach` logger. But the branch operation sorts the messages and sends messages with colors `blue` and `red` into their own branches. The only colors that show up as `UNKNOWN COLOR -` messages are non-blue and non-red and send through the `branch_pipelines_main_branch_3_forEach` logger. +We see that every message processed by the pipeline is logged through the `k.f.branch_pipelines_main_via_1_forEach` +logger. But the branch operation sorts the messages and sends messages with colors `blue` and `red` into their own +branches. The only colors that show up as `UNKNOWN COLOR -` messages are non-blue and non-red and send through +the `branch_pipelines_main_branch_3_forEach` logger. ### Example 5. Dynamic routing -Sometimes it is necessary to route a message to one stream or another based on the content of a message. This example shows how to route messages dynamically using a TopicNameExtractor. +Sometimes it is necessary to route a message to one stream or another based on the content of a message. This example +shows how to route messages dynamically using a TopicNameExtractor. ```yaml streams: @@ -419,7 +457,10 @@ pipelines: return 'ksml_sensordata_sensor0' ``` -The `topicNameExtractor` operation takes a function, which determines the routing of every message by returning a topic name string. In this case, when the key of a message is `sensor1` then the message will be sent to `ksml_sensordata_sensor1`. When it contains `sensor2` the message is sent to `ksml_sensordata_sensor2`. All other messages are sent to `ksml_sensordata_sensor0`. +The `topicNameExtractor` operation takes a function, which determines the routing of every message by returning a topic +name string. In this case, when the key of a message is `sensor1` then the message will be sent +to `ksml_sensordata_sensor1`. When it contains `sensor2` the message is sent to `ksml_sensordata_sensor2`. All other +messages are sent to `ksml_sensordata_sensor0`. The equivalent Kafka Streams topology looks like this: @@ -435,13 +476,16 @@ Topologies: <-- route_route_pipelines_main_via_1 ``` -The output does not show anything special compared to previous examples, since all messages are simply written by the logger. +The output does not show anything special compared to previous examples, since all messages are simply written by the +logger. ### Example 6. Multiple pipelines -In the previous examples there was always a single pipeline definition for processing data. KSML allows us to define multiple pipelines in a single file. +In the previous examples there was always a single pipeline definition for processing data. KSML allows us to define +multiple pipelines in a single file. -In this example we combine the filtering example with the routing example. We will also define new pipelines with the sole purpose of logging the routed messages. +In this example we combine the filtering example with the routing example. We will also define new pipelines with the +sole purpose of logging the routed messages. ```yaml # This example shows how to route messages to a dynamic topic. The target topic is the result of an executed function. @@ -517,13 +561,16 @@ pipelines: ``` In this definition we defined five pipelines: -1. `filtering` which filters out all sensor messages that don't have the color blue and sends it to the `sensor_filtered` stream. + +1. `filtering` which filters out all sensor messages that don't have the color blue and sends it to + the `sensor_filtered` stream. 2. `routing` which routes the data on the `sensor_filtered` stream to one of three target topics 3. `sensor0_peek` which writes the content of the `sensor_0` stream to the console 4. `sensor1_peek` which writes the content of the `sensor_1` stream to the console 5. `sensor2_peek` which writes the content of the `sensor_2` stream to the console The equivalent Kafka Streams topology looks like this: + ``` Topologies: Sub-topology: 0 @@ -566,7 +613,9 @@ Topologies: <-- ksml_sensordata_sensor2 ``` -And this is what the output would look something like this. The sensor peeks messages will not always be shown immediately after the Routing messages. This is because the pipelines are running in separate sub processes. +And this is what the output would look something like this. The sensor peeks messages will not always be shown +immediately after the Routing messages. This is because the pipelines are running in separate sub processes. + ``` 2024-03-06T20:11:39,520Z INFO k.f.route2_pipelines_routing_via_1_forEach Routing Blue sensor - key=sensor6, value={'city': 'Utrecht', 'color': 'blue', 'name': 'sensor6', 'owner': 'Charlie', 'timestamp': 1709755877401, 'type': 'LENGTH', 'unit': 'ft', 'value': '507', '@type': 'SensorData', '@schema': { <>}} 2024-03-06T20:11:39,523Z INFO k.f.route2_pipelines_sensor0_peek_forEach SENSOR0 - key=sensor6, value={'city': 'Utrecht', 'color': 'blue', 'name': 'sensor6', 'owner': 'Charlie', 'timestamp': 1709755877401, 'type': 'LENGTH', 'unit': 'ft', 'value': '507', '@type': 'SensorData', '@schema': { <>}} diff --git a/docs/notations.md b/docs/notations.md index 392f3537..7f4f65be 100644 --- a/docs/notations.md +++ b/docs/notations.md @@ -1,53 +1,62 @@ # Notations ### Table of Contents + 1. [Introduction](#introduction) -1. [Avro](#avro) -1. [CSV](#csv) -1. [JSON](#json) -1. [SOAP](#soap) -1. [XML](#xml) +2. [Avro](#avro) +3. [CSV](#csv) +4. [JSON](#json) +5. [SOAP](#soap) +6. [XML](#xml) ## Introduction -KSML is able to express its internal data types in a number of external representations. Internally these are called _notations_. -The different notations are described below. +KSML is able to express its internal data types in a number of external representations. Internally these are called +_notations_. The different notations are described below. ## AVRO -Avro types are supported through the "avro" prefix in types. The notation is ```avro:schema```, where schema is the schema fqdn, or just the schema name itself. +Avro types are supported through the "avro" prefix in types. The notation is ```avro:schema```, where schema is the +schema fqdn, or just the schema name itself. On Kafka topics, Avro types are serialized in binary format. Internally they are represented as structs. -Examples +Examples: + ``` avro:SensorData avro:io.axual.ksml.example.SensorData ``` -Note: when referencing an AVRO schema, you have to ensure that the respective schema file can be found in the KSML working directory and has the .avsc file extension. +Note: when referencing an AVRO schema, you have to ensure that the respective schema file can be found in the KSML +working directory and has the .avsc file extension. ## CSV -Comma-separated values are supported through the "csv" prefix in types. The notation is ```csv:schema```, where schema is the schema fqdn, or just the schema name itself. +Comma-separated values are supported through the "csv" prefix in types. The notation is ```csv:schema```, where schema +is the schema fqdn, or just the schema name itself. On Kafka topics, CSV types are serialized as `string`. Internally they are represented as structs. -Examples +Examples: + ``` csv:SensorData csv:io.axual.ksml.example.SensorData ``` -Note: when referencing an CSV schema, you have to ensure that the respective schema file can be found in the KSML working directory and has the .csv file extension. +Note: when referencing an CSV schema, you have to ensure that the respective schema file can be found in the KSML +working directory and has the .csv file extension. ## JSON -JSON types are supported through the "json" prefix in types. The notation is ```json:schema```, where `schema` is the schema fqdn, or just the schema name itself. +JSON types are supported through the "json" prefix in types. The notation is ```json:schema```, where `schema` is the +schema fqdn, or just the schema name itself. On Kafka topics, JSON types are serialized as `string`. Internally they are represented as structs or lists. -Examples +Examples: + ``` json:SensorData json:io.axual.ksml.example.SensorData @@ -58,20 +67,27 @@ If you want to use JSON without a schema, you can leave out the colon and schema ``` json ``` -Note: when referencing an JSON schema, you have to ensure that the respective schema file can be found in the KSML working directory and has the .json file extension. + +Note: when referencing an JSON schema, you have to ensure that the respective schema file can be found in the KSML +working directory and has the .json file extension. ## SOAP -SOAP is supported through built-in serializers and deserializers. The representation on Kafka will always be ```string```. Internally SOAP objects are structs with their own schema. Field names are derived from the SOAP standards. +SOAP is supported through built-in serializers and deserializers. The representation on Kafka will always +be ```string```. Internally SOAP objects are structs with their own schema. Field names are derived from the SOAP +standards. ## XML -XML is supported through built-in serializers and deserializers. The representation on Kafka will always be ```string```. Internally XML objects are structs. +XML is supported through built-in serializers and deserializers. The representation on Kafka will always +be ```string```. Internally XML objects are structs. + +Examples: -Examples ``` xml:SensorData xml:io.axual.ksml.example.SensorData ``` -Note: when referencing an XML schema, you have to ensure that the respective schema file can be found in the KSML working directory and has the .xsd file extension. +Note: when referencing an XML schema, you have to ensure that the respective schema file can be found in the KSML +working directory and has the .xsd file extension. diff --git a/docs/pipelines.md b/docs/pipelines.md index d5639a23..48cd70ea 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -67,14 +67,14 @@ four sink types in KSML: | Sink type | Description | |-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `as` | Allows the pipeline result to be saved under an internal referenceable name. Pipelines defined after this point may refer to this name in their `from` statement. | +| `as` | Allows the pipeline result to be saved under an internal name, which can later be referenced. Pipelines defined after this point may refer to this name in their `from` statement. | | `branch` | This statement allows the pipeline to be split up in several branches. Each branch filters messages with an `if` statement. Messages will be processed only by the first branch of which the `if` statement is true. | | `forEach` | Sends every message to a function, without expecting any return type. Because there is no return type, the pipeline always stops after this statement. | | `print` | Prints out every message according to a given output specification. | | `to` | Sends all output messages to a specific target. This target can be a pre-defined `stream`, `table` or `globalTable`, an inline-defined topic, or a special function called a `topicNameExtractor`. | -For more information, see the respective documentation -on [pipeline definitions](specifications.md#definitions/PipelineDefinition). +For more information, see the respective documentation on pipeline definitions in +the [definitions section of the KSML language spec](ksml-language-spec.md#definitions). ## Duration diff --git a/docs/quick-start.md b/docs/quick-start.md index 8dcdbf34..a6776b9a 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -1,23 +1,27 @@ # Quick start ### Table of Contents + 1. [Introduction](#introduction) 2. [Starting a demo setup](#starting-a-demo-setup) 3. [Starting a KSML runner](#starting-a-ksml-runner) 4. [Next steps](#next-steps) ## Introduction + KSML comes with example definitions, which contain a producer that outputs SensorData messages to Kafka, and several pipelines, which each independently consume and process the produced messages. ## Starting a demo setup + After checking out the repository, go to the KSML directory and execute the following: ``` docker compose up -d ``` -This will start Zookeeper, Kafka and a Schema Registry in the background. It will also start the demo producer, which outputs two random messages per second on a `ksml_sensordata_avro` topic. +This will start Zookeeper, Kafka and a Schema Registry in the background. It will also start the demo producer, which +outputs two random messages per second on a `ksml_sensordata_avro` topic. You can check the valid starting of these containers using the following command: @@ -26,6 +30,7 @@ docker compose logs -f ``` Press CTRL-C when you verified data is produced. This typically looks like this: + ``` example-producer-1 | 2024-03-06T20:24:49,480Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message example-producer-1 | 2024-03-06T20:24:49,480Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor2, value=SensorData: {"city":"Utrecht", "color":"white", "name":"sensor2", "owner":"Alice", "timestamp":1709756689480, "type":"HUMIDITY", "unit":"%", "value":"66"} @@ -42,7 +47,6 @@ example-producer-1 | 2024-03-06T20:24:50,035Z INFO i.a.k.r.backend.ExecutableP ``` - ## Starting a KSML runner To start a container which executes the example KSML definitions, type @@ -70,6 +74,7 @@ This will start the KSML docker container. You should see the following typical ## Next steps -Check out the examples in the [Examples]({{ site.github.repository_url }}/tree/main/examples/) directory. By modifying the file `examples/ksml-runner.yaml` you can select the example(s) to run. +Check out the examples in the `examples` directory of the project. By modifying the file `examples/ksml-runner.yaml` you +can select the example(s) to run. For a more elaborate introduction, you can start [here](introduction.md) or refer to the [documentation](index.md). diff --git a/docs/release-notes.md b/docs/release-notes.md index 4d499ac8..206064f1 100644 --- a/docs/release-notes.md +++ b/docs/release-notes.md @@ -2,45 +2,70 @@ ## Releases - * [Release Notes](#release-notes) - * [Releases](#releases) - * [0.8.0 (2024-03-08)](#080-2024-03-08) - * [0.2.2 (2024-01-30)](#022-2024-01-30) - * [0.2.1 (2023-12-20)](#021-2023-12-20) - * [0.2.0 (2023-12-07)](#020-2023-12-07) - * [0.1.0 (2023-03-15)](#010-2023-03-15) - * [0.0.4 (2022-12-02)](#004-2022-12-02) - * [0.0.3 (2021-07-30)](#003-2021-07-30) - * [0.0.2 (2021-06-28)](#002-2021-06-28) - * [0.0.1 (2021-04-30)](#001-2021-04-30) - + * [Releases](#releases) + * [1.0.0 (2024-06-28)](#100-2024-06-28) + * [0.8.0 (2024-03-08)](#080-2024-03-08) + * [0.9.1 (2024-06-21)](#091-2024-06-21) + * [0.9.0 (2024-06-05)](#090-2024-06-05) + * [0.2.2 (2024-01-30)](#022-2024-01-30) + * [0.2.1 (2023-12-20)](#021-2023-12-20) + * [0.2.0 (2023-12-07)](#020-2023-12-07) + * [0.1.0 (2023-03-15)](#010-2023-03-15) + * [0.0.4 (2022-12-02)](#004-2022-12-02) + * [0.0.3 (2021-07-30)](#003-2021-07-30) + * [0.0.2 (2021-06-28)](#002-2021-06-28) + * [0.0.1 (2021-04-30)](#001-2021-04-30) + +### 1.0.0 (2024-06-28) + +* Reworked parsing logic, allowing alternatives for operations and other definitions to co-exist in the KSML language + specification. This allows for better syntax checking in IDEs. +* Lots of small fixes and completion modifications. + +### 0.9.1 (2024-06-21) + +* Fix failing test in GitHub Actions during release +* Unified build workflows + +### 0.9.0 (2024-06-05) + +* Collectable metrics +* New topology test suite +* Python context hardening +* Improved handling of Kafka tombstones +* Added flexibility to producers (single shot, n-shot, or user condition-based) +* JSON Logging support +* Bumped GraalVM to 23.1.2 +* Bumped several dependency versions +* Several fixes and security updates ### 0.8.0 (2024-03-08) + * Reworked all parsing logic, to allow for exporting the JSON schema of the KSML specification: - * docs/specification.md is now derived from internal parser logic, guaranteeing consistency and completeness. - * examples/ksml.json contains the JSON schema, which can be loaded into IDEs for syntax validation and completion. + * docs/specification.md is now derived from internal parser logic, guaranteeing consistency and completeness. + * examples/ksml.json contains the JSON schema, which can be loaded into IDEs for syntax validation and completion. * Improved schema handling: - * Better compatibility checking between schema fields. + * Better compatibility checking between schema fields. * Improved support for state stores: - * Update to state store typing and handling. - * Manual state stores can be defined and referenced in pipelines. - * Manual state stores are also available in Python functions. - * State stores can be used 'side-effect-free' (eg. no AVRO schema registration) + * Update to state store typing and handling. + * Manual state stores can be defined and referenced in pipelines. + * Manual state stores are also available in Python functions. + * State stores can be used 'side-effect-free' (e.g. no AVRO schema registration) * Python function improvements: - * Automatic variable assignment for state stores. - * Every Python function can use a Java Logger, integrating Python output with KSML log output. - * Type inference in situations where parameters or result types can be derived from the context. + * Automatic variable assignment for state stores. + * Every Python function can use a Java Logger, integrating Python output with KSML log output. + * Type inference in situations where parameters or result types can be derived from the context. * Lots of small language updates: - * Improve readability for store types, filter operations and windowing operations - * Introduction of the "as" operation, which allows for pipeline referencing and chaining. + * Improve readability for store types, filter operations and windowing operations + * Introduction of the "as" operation, which allows for pipeline referencing and chaining. * Better data type handling: - * Separation of data types and KSML core, allowing for easier addition of new data types in the future. - * Automatic conversion of data types, removing common pipeline failure scenarios. - * New implementation for CSV handling. + * Separation of data types and KSML core, allowing for easier addition of new data types in the future. + * Automatic conversion of data types, removing common pipeline failure scenarios. + * New implementation for CSV handling. * Merged the different runners into a single runner. - * KSML definitions can now include both producers (data generators) and pipelines (Kafka Streams topologies). - * Removal of Kafka and Axual backend distinctions. + * KSML definitions can now include both producers (data generators) and pipelines (Kafka Streams topologies). + * Removal of Kafka and Axual backend distinctions. * Configuration file updates, allowing for running multiple definitions in a single runner (each in its own namespace). * Examples updated to reflect the latest definition format. * Documentation updated. @@ -82,7 +107,7 @@ **Changes:** * Added XML/SOAP support -* Added datagenerator +* Added data generator * Added Automatic Type Conversion * Added Schema Support for XML, Avro, JSON, Schema * Added Basic Error Handling @@ -102,7 +127,7 @@ * Bug fix for windowed objects * Store improvements * Support Liberica NIK -* Switch from Travis CI to Github workflow +* Switch from Travis CI to GitHub workflow * Build snapshot Docker image on pull request merged ### 0.0.3 (2021-07-30) diff --git a/docs/runners.md b/docs/runners.md index 4d344ff8..07c38e83 100644 --- a/docs/runners.md +++ b/docs/runners.md @@ -1,15 +1,19 @@ # Runners ### Table of Contents + 1. [Introduction](#introduction) 2. [Configuration](#configuration) - - [Namespace support](#using-with-axual-platform-or-other-namespaced-kafka-clusters) + - [Namespace support](#using-with-axual-platform-or-other-namespaced-kafka-clusters) 3. [Starting a container](#starting-a-container) ## Introduction -The core of KSML is a library that allows KSML definition files to be parsed and translated into Kafka Streams topologies. Because we wanted to keep KSML low-overhead, KSML does not run these topologies itself. A runner application is provided separately to execute the generated topologies. -The runner supports plain Kafka connections, which can be configured using normal Kafka properties, and contains an advanced configurations that helps running against Kafka clusters using namespacing. +The core of KSML is a library that allows KSML definition files to be parsed and translated into Kafka Streams +topologies. Because we wanted to keep KSML low-overhead, KSML does not run these topologies itself. A runner application +is provided separately to execute the generated topologies. +The runner supports plain Kafka connections, which can be configured using normal Kafka properties, and contains an +advanced configurations that helps running against Kafka clusters using namespacing. Examples of runner configurations are shown below. @@ -19,14 +23,14 @@ The configuration file passed to the KSML runner is in YAML format and should co ```yaml ksml: - applicationServer: # The application server is currently only offering REST querying of state stores + applicationServer: # The application server is currently only offering REST querying of state stores enabled: true # true if you want to enable REST querying of state stores host: 0.0.0.0 # by default listen on all interfaces port: 8080 # port to listen on configDirectory: /ksml/config # Location of the KSML definitions. Default is the current working directory schemaDirectory: /ksml/schemas # Location of the schema definitions. Default is the config directory storageDirectory: /ksml/data # Where the stateful data is written. Defaults is the default JVM temp directory - errorHandling: # how to handle errors + errorHandling: # how to handle errors consume: log: true # log errors logPayload: true # log message payloads upon error @@ -42,7 +46,9 @@ ksml: logPayload: true # log message payloads upon error loggerName: ProduceError # logger name handler: continueOnFail # continue or stop on error - definitions: # KSML definition files from the working directory + enableProducers: true # False to disable producers in the KSML definition + enablePipelines: true # False to disable pipelines in the KSML definition + definitions: # KSML definition files from the working directory namedDefinition1: definition1.yaml namedDefinition2: definition2.yaml namedDefinition3: @@ -56,7 +62,7 @@ kafka: # Kafka streams configuration options ssl.endpoint.identification.algorithm: "" ssl.truststore.location: /ksml/config/truststore.jks ssl.truststore.password: password-for-truststore - + # Schema Registry client configuration, needed when schema registry is used schema.registry.url: http://schema-registry:8083 schema.registry.ssl.truststore.location: /ksml/config/truststore.jks @@ -73,22 +79,24 @@ The following config will resolve the backing topic of a stream or table ```yaml kafka: - # The patterns for topics, groups and transactional ids. - # Each field between the curly braces must be specified in the configuration, except the topic, - # group.id and transactional.id fields, which is used to identify the place where the resource name - # is used - topic.pattern: "{tenant}-{instance}-{environment}-{topic}" - group.id.pattern: "{tenant}-{instance}-{environment}-{group.id}" - transactional.id.pattern: "{tenant}-{instance}-{environment}-{transactional.id}" - - # Additional configuration options used for resolving the pattern to values - tenant: "ksmldemo" - instance: "dta" - environment: "dev" + # The patterns for topics, groups and transactional ids. + # Each field between the curly braces must be specified in the configuration, except the topic, + # group.id and transactional.id fields, which is used to identify the place where the resource name + # is used + topic.pattern: "{tenant}-{instance}-{environment}-{topic}" + group.id.pattern: "{tenant}-{instance}-{environment}-{group.id}" + transactional.id.pattern: "{tenant}-{instance}-{environment}-{transactional.id}" + + # Additional configuration options used for resolving the pattern to values + tenant: "ksmldemo" + instance: "dta" + environment: "dev" ``` ## Starting a container -To start a container the KSML definitions and Runner configuration files need to be available in a directory mounted inside the docker container. + +To start a container the KSML definitions and Runner configuration files need to be available in a directory mounted +inside the docker container. The default Runner configuration filename is **_ksml-runner.yaml_**. If no arguments are given, the runner will look for this file in the home directory diff --git a/docs/types.md b/docs/types.md index 5d65788d..d920ffb4 100644 --- a/docs/types.md +++ b/docs/types.md @@ -46,6 +46,7 @@ any specific underlying type. Some fields in the KSML spec are of type `duration`. These fields have a fixed format `123x`, where `123` is an integer and `x` is any of the following: + * __: milliseconds * `s`: seconds * `m`: minutes diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/KsmlInfo.java b/ksml-runner/src/main/java/io/axual/ksml/runner/KsmlInfo.java index 451abd08..cd573607 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/KsmlInfo.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/KsmlInfo.java @@ -20,7 +20,7 @@ * =========================LICENSE_END================================== */ -import io.axual.ksml.metric.KSMLMetrics; +import io.axual.ksml.metric.Metrics; import lombok.extern.slf4j.Slf4j; import javax.management.*; @@ -68,7 +68,7 @@ public class KsmlInfo { } public static void registerKsmlAppInfo(String appId) { - var beanName = "%s:type=app-info,app-id=%s,app-name=%s,app-version=%s,build-time=%s".formatted(KSMLMetrics.DOMAIN, appId, ObjectName.quote(APP_NAME), ObjectName.quote(APP_VERSION), ObjectName.quote(BUILD_TIME)); + var beanName = "%s:type=app-info,app-id=%s,app-name=%s,app-version=%s,build-time=%s".formatted(Metrics.DOMAIN, appId, ObjectName.quote(APP_NAME), ObjectName.quote(APP_VERSION), ObjectName.quote(BUILD_TIME)); try { var objectName = ObjectName.getInstance(beanName); var beanServer = ManagementFactory.getPlatformMBeanServer(); diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/prometheus/PrometheusExport.java b/ksml-runner/src/main/java/io/axual/ksml/runner/prometheus/PrometheusExport.java index c1fc895e..8963d569 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/prometheus/PrometheusExport.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/prometheus/PrometheusExport.java @@ -20,7 +20,7 @@ * =========================LICENSE_END================================== */ -import io.axual.ksml.metric.KSMLMetrics; +import io.axual.ksml.metric.Metrics; import io.axual.ksml.runner.config.PrometheusConfig; import io.prometheus.jmx.BuildInfoMetrics; import io.prometheus.jmx.JmxCollector; @@ -53,7 +53,7 @@ public PrometheusExport(PrometheusConfig config) { @Synchronized public void start() throws Exception { - KSMLMetrics.init(); + Metrics.init(); if (!config.isEnabled()) { log.info("Prometheus export is disabled"); return; diff --git a/ksml/src/main/java/io/axual/ksml/metric/KSMLMetrics.java b/ksml/src/main/java/io/axual/ksml/metric/Metrics.java similarity index 95% rename from ksml/src/main/java/io/axual/ksml/metric/KSMLMetrics.java rename to ksml/src/main/java/io/axual/ksml/metric/Metrics.java index a901e73a..5f4f0678 100644 --- a/ksml/src/main/java/io/axual/ksml/metric/KSMLMetrics.java +++ b/ksml/src/main/java/io/axual/ksml/metric/Metrics.java @@ -22,10 +22,10 @@ import java.util.Collections; -public class KSMLMetrics { +public class Metrics { public static final String DOMAIN = "ksml"; - private KSMLMetrics() { + private Metrics() { } private static final MetricsRegistry REGISTRY = new MetricsRegistry(); diff --git a/ksml/src/main/java/io/axual/ksml/python/Invoker.java b/ksml/src/main/java/io/axual/ksml/python/Invoker.java index 82c73374..bf0de03c 100644 --- a/ksml/src/main/java/io/axual/ksml/python/Invoker.java +++ b/ksml/src/main/java/io/axual/ksml/python/Invoker.java @@ -27,7 +27,7 @@ import io.axual.ksml.data.tag.ContextTags; import io.axual.ksml.data.type.DataType; import io.axual.ksml.exception.TopologyException; -import io.axual.ksml.metric.KSMLMetrics; +import io.axual.ksml.metric.Metrics; import io.axual.ksml.metric.MetricName; import io.axual.ksml.user.UserFunction; import org.slf4j.Logger; @@ -46,10 +46,10 @@ protected Invoker(UserFunction function, ContextTags metricTags, String function } final var metricName = new MetricName("execution-time", metricTags.append("function-type", functionType).append("function-name", function.name)); - if (KSMLMetrics.registry().getTimer(metricName) == null) { - timer = KSMLMetrics.registry().registerTimer(metricName); + if (Metrics.registry().getTimer(metricName) == null) { + timer = Metrics.registry().registerTimer(metricName); } else { - timer = KSMLMetrics.registry().getTimer(metricName); + timer = Metrics.registry().getTimer(metricName); } this.function = function; } diff --git a/ksml/src/main/java/io/axual/ksml/python/PythonContext.java b/ksml/src/main/java/io/axual/ksml/python/PythonContext.java index 8b217221..b0c9befe 100644 --- a/ksml/src/main/java/io/axual/ksml/python/PythonContext.java +++ b/ksml/src/main/java/io/axual/ksml/python/PythonContext.java @@ -22,7 +22,7 @@ import io.axual.ksml.data.exception.ExecutionException; import io.axual.ksml.data.mapper.DataObjectConverter; -import io.axual.ksml.metric.KSMLMetrics; +import io.axual.ksml.metric.Metrics; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.graalvm.polyglot.*; @@ -33,7 +33,7 @@ @Slf4j public class PythonContext { private static final LoggerBridge LOGGER_BRIDGE = new LoggerBridge(); - private static final MetricsBridge METRICS_BRIDGE = new MetricsBridge(KSMLMetrics.registry()); + private static final MetricsBridge METRICS_BRIDGE = new MetricsBridge(Metrics.registry()); private static final String PYTHON = "python"; private static final List ALLOWED_JAVA_CLASSES = List.of( "java.util.ArrayList", @@ -114,7 +114,7 @@ def register_ksml_bridges(lb, mb): if (register == null) { throw new ExecutionException("Could not register global code for loggerBridge:\n" + pyCode); } - // Load the global LOGGER_BRIDGE variable into the context + // Pass the global LOGGER_BRIDGE and METRICS_BRIDGE variables into global variables of the Python context register.execute(LOGGER_BRIDGE, METRICS_BRIDGE); } } diff --git a/ksml/src/test/java/io/axual/ksml/python/MetricsBridgeTest.java b/ksml/src/test/java/io/axual/ksml/python/MetricsBridgeTest.java index 0d280497..37121a9a 100644 --- a/ksml/src/test/java/io/axual/ksml/python/MetricsBridgeTest.java +++ b/ksml/src/test/java/io/axual/ksml/python/MetricsBridgeTest.java @@ -20,7 +20,7 @@ * =========================LICENSE_END================================== */ -import io.axual.ksml.metric.KSMLMetrics; +import io.axual.ksml.metric.Metrics; import io.axual.ksml.metric.MetricsRegistry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,7 +39,7 @@ class MetricsBridgeTest { @BeforeEach void setUp() { - metricsBridge = new MetricsBridge(KSMLMetrics.registry()); + metricsBridge = new MetricsBridge(Metrics.registry()); } @Test