Skip to content

Commit e14740b

Browse files
committed
pom.xml refactor
1 parent bcf28d8 commit e14740b

File tree

4 files changed

+10
-55
lines changed

4 files changed

+10
-55
lines changed

README.md

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ To build the connector, you must have the following installed:
2020

2121
Clone the repository with the following command:
2222
```
23-
git clone https://github.com/uts58/kafka-connect-mqtt.git
23+
git clone https://github.com/uts58/kafka-connect-mqtt-file.git
2424
```
25-
Change directory into the `kafka-connect-mqtt` directory
25+
Change directory into the `kafka-connect-mqtt-file` directory
2626
```
27-
cd kafka-connect-mqtt
27+
cd kafka-connect-mqtt-file
2828
```
2929
Build the connector fat jar using Maven
3030
```
@@ -49,11 +49,6 @@ http://<kafkaconnect>:8083/connector-plugins
4949
If you see these entries, the connector has been installed succesfully
5050

5151
```
52-
{
53-
"class": "org.ndsu.agda.connect.connectors.mqtt.MQTTSinkConnector",
54-
"type": "sink",
55-
"version": "<version>"
56-
},
5752
{
5853
"class": "org.ndsu.agda.connect.connectors.mqtt.MQTTSourceConnector",
5954
"type": "source",
@@ -99,42 +94,6 @@ curl -X POST \
9994
* `mqtt.userName` (optional): Username to connect to MQTT broker
10095
* `mqtt.password` (optional): Password to connect to MQTT broker
10196

102-
## Configuring the MQTT Sink connector
103-
104-
The MQTT Sink Connector reads messages from a Kafka topic and publishes them to a MQTT topic.
105-
106-
Here is a basic configuration example:
107-
```
108-
curl -X POST \
109-
http://<kafkaconnect>>:8083/connectors \
110-
-H 'Content-Type: application/json' \
111-
-d '{ "name": "mqtt-sink-connector",
112-
"config":
113-
{
114-
"connector.class":"org.ndsu.agda.connect.connectors.mqtt.MQTTSinkConnector",
115-
"mqtt.topic":"my_mqtt_topic",
116-
"topics":"my_kafka_topic",
117-
"mqtt.qos": 2,
118-
"mqtt.clientID":"my_client_id",
119-
"mqtt.broker":"tcp://127.0.0.1:1883",
120-
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
121-
"key.converter.schemas.enable":false,
122-
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
123-
"value.converter.schemas.enable":false
124-
}
125-
}'
126-
```
127-
128-
### Optional Configuration options
129-
* `mqtt.qos` (optional): 0 – At most Once, 1 – At Least Once, 2 – Exactly Once
130-
* `mqtt.automaticReconnect` (optional)(default: true): Should the client automatically reconnect in case of connection failures
131-
* `mqtt.keepAliveInterval` (optional)(default: 60 seconds)
132-
* `mqtt.cleanSession` (optional)(default: true): Controls the state after disconnecting the client from the broker.
133-
* `mqtt.connectionTimeout` (optional)(default: 30 seconds)
134-
* `mqtt.userName` (optional): Username to connect to MQTT broker
135-
* `mqtt.password` (optional): Password to connect to MQTT broker
136-
137-
13897
## Configuring the File Sink connector
13998

14099
The File Sink Connector reads messages from a Kafka topic(s) and dumps them into files.

pom.xml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<groupId>org.ndsu.adga</groupId>
99
<artifactId>kafka-connect</artifactId>
10-
<version>1.2.2</version>
10+
<version>1.2.3</version>
1111
<packaging>jar</packaging>
1212
<name>kafka.connect</name>
1313

@@ -46,17 +46,11 @@
4646
<version>33.0.0-jre</version>
4747
</dependency>
4848

49-
<dependency>
50-
<groupId>org.mongodb</groupId>
51-
<artifactId>bson</artifactId>
52-
<version>5.3.0</version>
53-
</dependency>
5449
<dependency>
5550
<groupId>com.fasterxml.jackson.core</groupId>
5651
<artifactId>jackson-databind</artifactId>
5752
<version>2.18.2</version>
5853
</dependency>
59-
6054
</dependencies>
6155

6256
<build>
@@ -115,4 +109,4 @@
115109
</plugin>
116110
</plugins>
117111
</build>
118-
</project>
112+
</project>

src/main/java/org/ndsu/agda/connect/connectors/file/FileSinkJsonWriter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.ndsu.agda.connect.connectors.file;
22

3-
import org.apache.kafka.connect.errors.ConnectException;
43
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
65

src/main/java/org/ndsu/agda/connect/connectors/mqtt/MQTTSourceConverter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.slf4j.LoggerFactory;
1010

1111
import java.io.IOException;
12+
import java.time.Instant;
1213
import java.util.Collections;
1314
import java.util.HashMap;
1415
import java.util.Map;
@@ -35,14 +36,16 @@ protected SourceRecord convert(String topic, MqttMessage mqttMessage) {
3536
try {
3637
jsonMap = OBJECT_MAPPER.readValue(payload, new TypeReference<Map<String, Object>>() {
3738
});
39+
jsonMap.put("mqttReceivedAt", Instant.now().toString());
40+
3841
if (jsonMap.containsKey("iotnode")) {
3942
@SuppressWarnings("unchecked")
4043
Map<String, Object> iotNode = (Map<String, Object>) jsonMap.get("iotnode");
4144
if (iotNode.containsKey("reportedAt")) {
42-
String reportedAtStr = (String) iotNode.get("reportedAt");
43-
jsonMap.put("reportedAt", reportedAtStr);
45+
jsonMap.put("baseReportedAt", iotNode.get("reportedAt").toString());
4446
}
4547
}
48+
4649
} catch (IOException e) {
4750
log.error("Failed to parse JSON payload, passing raw payload in a wrapper", e);
4851
jsonMap = Collections.singletonMap("raw", payload);

0 commit comments

Comments
 (0)