Skip to content

Kafka Producer as a Datasink#329

Open
iamaccosta wants to merge 5 commits intoeclipse-basyx:mainfrom
iamaccosta:main
Open

Kafka Producer as a Datasink#329
iamaccosta wants to merge 5 commits intoeclipse-basyx:mainfrom
iamaccosta:main

Conversation

@iamaccosta
Copy link
Copy Markdown

@iamaccosta iamaccosta commented Oct 30, 2024

This PR introduces a Kafka producer implementation to the BaSyx Databridge, allowing Kafka to act as a DataSink. This enhancement makes it possible to publish data from the AAS environment to Kafka topics, extending integration options for real-time data streaming and processing.

I added the files:

  • KafkaProducerConfiguration.java
  • KafkaDefaultSinkConfigurationFactory.java

Ran the commands:

  • mvn clean install -DskipTests (removed the tests since they were failing)
  • docker build -t custom-databridge:latest.

Dockerfile:

# Use the official BaSyx Databridge base image
FROM eclipsebasyx/databridge:1.0.0-SNAPSHOT

COPY ./basyx-databridge/*/target/*.jar /app/libs/

Launched my databridge example that is:

  • retrieving data from an OPCUA server and publishing into AAS properties.
  • retrieving with aaspollingconsumer.json from the same property and sending to a Kafka broker.

For data protection reasons, I won't be supplying the code from the OPCUA part. The rest of the files are presented below.

databridge.kafka.zip

Logs:

2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Found files: [timerconsumer.json, logback.xml, jsonataValue.json, routes.json, jsonjacksontransformer.json, jsonatatransformer.json, aaspollingconsumer.json, kafkadatasink.json, opcuaconsumer.json, aasserver.json]
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - In class org.eclipse.digitaltwin.basyx.databridge.core.configuration.factory.DataTransformerConfigurationFactory the field could not be  DEFAULT_FILE_PATH found!
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Retrieved method create
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Instantiated class org.eclipse.digitaltwin.basyx.databridge.timer.configuration.factory.TimerDefaultConfigurationFactory
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Data source added - timerconsumer.json
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - In class org.eclipse.digitaltwin.basyx.databridge.core.configuration.factory.DataSinkConfigurationFactory the field could not be  DEFAULT_FILE_PATH found!
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Retrieved method create
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Instantiated class org.eclipse.digitaltwin.basyx.databridge.opcua.configuration.factory.OpcuaDefaultConfigurationFactory
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Data source added - opcuaconsumer.json
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Retrieved method create
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Instantiated class org.eclipse.digitaltwin.basyx.databridge.aas.configuration.factory.AASPollingConsumerDefaultConfigurationFactory
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Data source added - aaspollingconsumer.json
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Config file doesn't match to consumer, transformer, or server!
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Retrieved method create
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Instantiated class org.eclipse.digitaltwin.basyx.databridge.jsonjackson.configuration.factory.JsonJacksonDefaultConfigurationFactory
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Data Transformer added - jsonjacksontransformer.json
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Retrieved method create
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Instantiated class org.eclipse.digitaltwin.basyx.databridge.jsonata.configuration.factory.JsonataDefaultConfigurationFactory
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Data Transformer added - jsonatatransformer.json
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - In class org.eclipse.digitaltwin.basyx.databridge.core.configuration.factory.DataSourceConfigurationFactory the field could not be  DEFAULT_FILE_PATH found!
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Retrieved method create
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeUtils - Instantiated class org.eclipse.digitaltwin.basyx.databridge.aas.configuration.factory.AASProducerDefaultConfigurationFactory
2024-10-29 16:18:07 [main] INFO org.eclipse.digitaltwin.basyx.databridge.component.RoutesConfigurationLoader - Data sink added - aasserver.json
2024-10-29 16:18:08 java.lang.NullPointerException
2024-10-29 16:18:08     at org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteCreatorHelper.getDataSinkEndpoints(RouteCreatorHelper.java:45)
2024-10-29 16:18:08     at org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.AbstractRouteCreator.addRouteToRouteBuilder(AbstractRouteCreator.java:55)
2024-10-29 16:18:08     at org.eclipse.digitaltwin.basyx.databridge.core.routebuilder.DataBridgeRouteBuilder.configure(DataBridgeRouteBuilder.java:58)
2024-10-29 16:18:08     at org.apache.camel.builder.RouteBuilder.checkInitialized(RouteBuilder.java:726)
2024-10-29 16:18:08     at org.apache.camel.builder.RouteBuilder.configureRoutes(RouteBuilder.java:668)
2024-10-29 16:18:08     at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:600)
2024-10-29 16:18:08     at org.apache.camel.impl.engine.AbstractCamelContext.addRoutes(AbstractCamelContext.java:1213)
2024-10-29 16:18:08     at org.eclipse.digitaltwin.basyx.databridge.core.component.DataBridgeComponent.startRoutes(DataBridgeComponent.java:86)
2024-10-29 16:18:08     at org.eclipse.digitaltwin.basyx.databridge.core.component.DataBridgeComponent.startComponent(DataBridgeComponent.java:80)
2024-10-29 16:18:08     at org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeExecutable.main(DataBridgeExecutable.java:48)

Copy link
Copy Markdown
Member

@mdanish98 mdanish98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @iamaccosta,

Thanks a lot for your implementation.

Please address the mentioned review remarks.

There are few additional remarks as follows:

  • Please sign the Eclipse Contributor Agreement. If you don't already have an eclipse account, you will need to create one. You can also link the account to your GitHub account. Please make sure that the email that you used to sign your commits is the same mail that you use for your eclipse account.
  • Please use the formatter to format the code (if using eclipse IDE). [1]
  • An example is missing, please add an example to demonstrate the working of your implementation, refer to [1]. You are free to choose any Consumer and Transformer, but I would suggest (Aas - [any transformer] - Kafka) if possible.
  • The regression tests are missing, and it can be done in two steps. These tests are similar to the example above, but it ensures that the implementation also works with the DataBridge component (i.e, in docker environment). First, add a DataBridgeSuiteKafka (similar to [3]), and then add a test implementation of it (similar to [4]).

Links:
[1] https://wiki.basyx.org/en/latest/_downloads/ef40d3c625f309e00aacfe812a8b9e68/BaSyx_Formatting.zip
[2] https://github.com/eclipse-basyx/basyx-databridge/tree/main/databridge.examples
[3] https://github.com/eclipse-basyx/basyx-databridge/blob/main/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/DataBridgeSuiteMqtt.java
[4] https://github.com/eclipse-basyx/basyx-databridge/blob/main/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeMqtt.java

@iamaccosta
Copy link
Copy Markdown
Author

Hi @mdanish98,

New commits pushed. I addressed the mentioned reviews, signed the ECA, added the formatter to Eclipse, and the example to databridge.examples. Missing the regression tests still.

In the example, I used the .aasx file presented in the aas-jsonata-opcua -> test.resources.aasx to simulate the assertions.
I think the test is failling, but i don't know the reason...

Since my local test from the initial PR commits was failling maybe due to some errors on my initial implementation, I think this test is supposed to fail right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants