This project aims to create a data pipeline for processing and analyzing IoT data from different devices. The pipeline consists of various components including data ingestion, storage, processing, and visualization. The pipeline uses Kafka, JuiceFS, and Spark to achieve scalability, fault-tolerance, and high-performance processing.
The architecture of the project consists of the following components:
The IoT Device Simulator is used to simulate IoT devices that send JSON messages to the Kafka broker. The IoT Simulator is accessible in this repository: https://github.com/massimocallisto/iot-simulator
The Kafka broker receives the JSON messages from the IoT devices and stores them in topics. It is assemed that Kafka is already running and listening on port 9092. Set also in console the following variable.KAFKA_HOME=/opt/kafka
Download the MQTT Source connector
Unzip and copy the content in
$KAFKA_HOME/plugins/mqtt-connector
In the configuration file
$KAFKA_HOME/config/connect-distributed.properties add the following line plugin.path=/opt/kafka/plugins
Start the connector with the following comand
$KAFKA_HOME/bin/connect-distributed.sh $KAFKA_HOME/config/connect-distributed.properties
To run the connector we have to define a configuration as JSON file to submit to the worker connector. Save it as ~/mqtt_connect.json
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://localhost:1883",
"mqtt.topics": "#",
"kafka.topic": "mqtt.echo",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": ""
}
}
Then submit to the worker:
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d ~/mqtt_connect.json
The Persistor Prototype is composed of a producer and consumer that are connected to Kafka. The producer sends an ack with the UUID of the iot json file received and the consumer stores the IoT json messages received from kafka in JuiceFS or consume the messages from the ack topic and use another method of the persistor prototype to create a log file after the ack with the uuid of each files.
The JuiceFS is a distributed file system that provides consistent and scalable storage for the IoT data. The consumer stores the IoT json messages received from kafka in JuiceFS.
To configure JuiceFS, you will need to install the client software on each node in your cluster. The client software can be downloaded from the Juice Technologies website or from GitHub. Once installed, you will need to configure the client with the following commands:
-
juicefs format --storage minio --bucket http://127.0.0.1:9000/myjfs --access-key minioadmin --secret-key minioadmin "redis://127.0.0.1:6379/1" myjfs- It is assumed that Redis and Minio are installed and running. Redis must run on the port 6379. Minio must run on the port 9000. -
juicefs config set <option> <value>- This command sets a configuration option in JuiceFS. Options include storage backend type (e.g., S3), authentication method, and encryption settings. -
sudo juicefs mount "redis://127.0.0.1:6379/1" /mnt/jfs- This command mounts a directory in your filesystem as a mountpoint in JuiceFS. You can specify the mountpoint path or use an existing directory in your filesystem as the mountpoint path. -
juicefs unmount <mountpoint>- This command unmounts a directory from your filesystem that was previously mounted as a mountpoint in JuiceFS.
Spark is used for processing and analyzing the IoT data stored in JuiceFS. Spark's processing capability is enhanced by the use of Scala queries to analyze the data.
To allow Spark to access the data saved in JuiceFS it is needed to write the following configuration inside $SPARK_HOME/conf/spark-defaults.conf
spark.hadoop.fs.jfs.impl=io.juicefs.JuiceFileSystem
spark.hadoop.fs.AbstractFileSystem.jfs.impl=io.juicefs.JuiceFS
spark.hadoop.juicefs.meta=redis://127.0.0.1:6379/1
In order to run the Spark application first run 'mvn packages' inside the sparkjob directory then run
$SPARK_HOME/bin/spark-submit --class "com.dstorage.App" /target/sparkjob-1.0-SNAPSHOT.jar
- Clone the repository to your local machine.
- Install and configure Kafka on your local machine.
- Install JuiceFS and configure it to work with Kafka.
- Install and configure Spark on your local machine.
- Run the Persistor Prototype, the Spark job, and the visualization tool of your choice.
To use the Project, follow the steps below:
- Connect the IoT devices to the Kafka broker and start sending JSON messages.
- The Kafka broker will store the messages in topics.
- The Persistor Prototype will consume the messages from the topic and store them in JuiceFS or consume the messages from the ack topic and use another method of the persistor - prototype to create a log file after the ack with the uuid of each files.
- The Spark job will read the data from JuiceFS and perform the required processing and analysis.
- The processed data can be visualized using various visualization tools.
If you want to contribute to this project, follow the steps below:
- Fork the repository to your GitHub account.
- Clone the forked repository to your local machine.
- Create a new branch and make the changes.
- Commit the changes and push them to your forked repository.
- Submit a pull request to the original repository.
The project was developed as part of the course "Technologies for Big Data Management" at the University of Camerino.
