Skip to content

Commit 13eed7b

Browse files
committed
feat(plugin): add support for auto-creating the internal topic used by ConnectFilePulse (#139)
Resolves: #139
1 parent 8f6c8b3 commit 13eed7b

8 files changed

+124
-17
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/FileObjectStateBackingStore.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2222
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
2323
import org.apache.kafka.common.Configurable;
24-
import org.apache.kafka.common.config.ConfigDef;
2524

2625
import java.util.Map;
2726

@@ -32,8 +31,4 @@ public interface FileObjectStateBackingStore extends StateBackingStore<FileObjec
3231
*/
3332
@Override
3433
default void configure(final Map<String, ?> configs) { }
35-
36-
default ConfigDef configDef() {
37-
return new ConfigDef();
38-
}
3934
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/InMemoryFileObjectStateBackingStore.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2222
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
2323
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426

2527
import java.util.Collections;
2628
import java.util.concurrent.ConcurrentHashMap;
@@ -30,11 +32,13 @@
3032
/**
3133
* In-memory {@link StateBackingStore} implementation.
3234
*/
33-
public class InMemoryFileObjectStateBackingStore implements StateBackingStore<FileObject> {
35+
public class InMemoryFileObjectStateBackingStore implements FileObjectStateBackingStore {
36+
37+
private static final Logger LOG = LoggerFactory.getLogger(InMemoryFileObjectStateBackingStore.class);
3438

3539
private final ConcurrentHashMap<String, FileObject> objects = new ConcurrentHashMap<>();
3640

37-
private UpdateListener<FileObject> listener;
41+
private StateBackingStore.UpdateListener<FileObject> listener;
3842

3943
private final AtomicBoolean started = new AtomicBoolean(false);
4044

@@ -82,18 +86,19 @@ public boolean contains(final String name) {
8286
* {@inheritDoc}
8387
*/
8488
@Override
85-
public void putAsync(final String name, final FileObject state) {
86-
put(name, state);
89+
public void putAsync(final String name, final FileObject object) {
90+
put(name, object);
8791
}
8892

8993
/**
9094
* {@inheritDoc}
9195
*/
9296
@Override
93-
public void put(final String name, final FileObject state) {
94-
objects.put(name, state);
97+
public void put(final String name, final FileObject object) {
98+
LOG.debug("Put object in store with key={}, object={}", name, object);
99+
objects.put(name, object);
95100
if (listener != null) {
96-
listener.onStateUpdate(name, state);
101+
listener.onStateUpdate(name, object);
97102
}
98103
}
99104

@@ -102,6 +107,7 @@ public void put(final String name, final FileObject state) {
102107
*/
103108
@Override
104109
public void remove(final String name) {
110+
LOG.debug("Remove object in store with key={}", name);
105111
objects.remove(name);
106112
if (listener != null) {
107113
listener.onStateRemove(name);

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,28 @@
2121
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2222
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore;
2323
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
24-
24+
import org.apache.kafka.clients.admin.AdminClient;
25+
import org.apache.kafka.clients.admin.CreateTopicsResult;
26+
import org.apache.kafka.clients.admin.NewTopic;
27+
import org.apache.kafka.common.KafkaFuture;
28+
import org.apache.kafka.common.config.TopicConfig;
29+
import org.apache.kafka.common.errors.TopicExistsException;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.util.HashMap;
34+
import java.util.List;
2535
import java.util.Map;
36+
import java.util.concurrent.ExecutionException;
2637
import java.util.concurrent.TimeUnit;
2738
import java.util.concurrent.TimeoutException;
2839

2940
/**
3041
*/
3142
public class KafkaFileObjectStateBackingStore implements FileObjectStateBackingStore {
3243

44+
private static final Logger LOG = LoggerFactory.getLogger(KafkaFileObjectStateBackingStore.class);
45+
3346
private static final String KEY_PREFIX = "connect-file-pulse";
3447

3548
private KafkaStateBackingStore<FileObject> store;
@@ -48,6 +61,35 @@ public void configure(final Map<String, ?> props) {
4861
new FileObjectSerde(),
4962
config.getTaskStorageConsumerEnabled()
5063
);
64+
65+
try (AdminClient client = AdminClient.create(config.getTaskStorageConfigs())) {
66+
Map<String, String> topicConfig = new HashMap<>();
67+
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
68+
final NewTopic newTopic = new NewTopic(
69+
config.getTaskStorageTopic(),
70+
config.getTopicPartitions(),
71+
config.getReplicationFactor()
72+
).configs(topicConfig);
73+
createTopic(client, newTopic);
74+
}
75+
}
76+
77+
private void createTopic(final AdminClient adminClient, final NewTopic topic) {
78+
try {
79+
LOG.info("Attempt to create new topic '{}'", topic);
80+
CreateTopicsResult result = adminClient.createTopics(List.of(topic));
81+
KafkaFuture<Void> future = result.all();
82+
future.get();
83+
} catch (ExecutionException e) {
84+
Throwable cause = e.getCause();
85+
if (cause instanceof TopicExistsException) {
86+
LOG.debug("Failed to created topic '{}'. Topic already exists.", topic);
87+
} else {
88+
LOG.warn("Failed to create topic '{}'", topic, e);
89+
}
90+
} catch (InterruptedException e) {
91+
LOG.warn("Failed to create topic '{}'", topic, e);
92+
}
5193
}
5294

5395
/**

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525

2626
import java.util.HashMap;
2727
import java.util.Map;
28+
import java.util.Optional;
2829

2930
public class KafkaFileObjectStateBackingStoreConfig extends AbstractConfig {
3031

32+
private static final String GROUP = "KafkaFileObjectStateBackingStore";
33+
3134
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_CONFIG = "tasks.file.status.storage.topic";
3235
private static final String TASKS_FILE_STATUS_STORAGE_TOPIC_DOC = "The topic name which is used to report file states.";
3336
private static final String TASKS_FILE_STATUS_STORAGE_TOPIC_DEFAULT = "connect-file-pulse-status";
@@ -40,6 +43,12 @@ public class KafkaFileObjectStateBackingStoreConfig extends AbstractConfig {
4043
public static final String TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG = "tasks.file.status.storage.consumer.enabled";
4144
public static final String TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_DOC = "Boolean to indicate if the storage should consume the status topic.";
4245

46+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_CONFIG = "tasks.file.status.storage.topic.partitions";
47+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_DOC = "The number of partitions to be used for the status storage topic.";
48+
49+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG = "tasks.file.status.storage.topic.replication.factor";
50+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor to be used for the status storage topic.";
51+
4352
/**
4453
* Creates a new {@link KafkaFileObjectStateBackingStoreConfig} instance.
4554
*
@@ -82,33 +91,80 @@ private Map<String, Object> getInternalKafkaProducerConfigs() {
8291
return this.originalsWithPrefix("tasks.file.status.storage.producer.");
8392
}
8493

94+
Optional<Integer> getTopicPartitions() {
95+
return Optional.ofNullable(getInt(TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_CONFIG));
96+
}
97+
98+
Optional<Short> getReplicationFactor() {
99+
return Optional.ofNullable(getShort(TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG));
100+
}
101+
85102
static ConfigDef configDef() {
103+
int groupCounter = 0;
86104
return new ConfigDef()
87105
.define(
88106
TASKS_FILE_STATUS_STORAGE_TOPIC_CONFIG,
89107
ConfigDef.Type.STRING,
90108
TASKS_FILE_STATUS_STORAGE_TOPIC_DEFAULT,
91109
ConfigDef.Importance.HIGH,
92-
TASKS_FILE_STATUS_STORAGE_TOPIC_DOC
110+
TASKS_FILE_STATUS_STORAGE_TOPIC_DOC,
111+
GROUP,
112+
groupCounter++,
113+
ConfigDef.Width.NONE,
114+
TASKS_FILE_STATUS_STORAGE_TOPIC_CONFIG
93115
)
94116
.define(
95117
TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG,
96118
ConfigDef.Type.STRING,
97119
ConfigDef.Importance.HIGH,
98-
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC
120+
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC,
121+
GROUP,
122+
groupCounter++,
123+
ConfigDef.Width.NONE,
124+
TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG
125+
)
126+
.define(
127+
TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG,
128+
ConfigDef.Type.SHORT,
129+
null,
130+
ConfigDef.Importance.MEDIUM,
131+
TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_DOC,
132+
GROUP,
133+
groupCounter++,
134+
ConfigDef.Width.NONE,
135+
TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG
136+
)
137+
.define(
138+
TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_CONFIG,
139+
ConfigDef.Type.INT,
140+
null,
141+
ConfigDef.Importance.MEDIUM,
142+
TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_DOC,
143+
GROUP,
144+
groupCounter++,
145+
ConfigDef.Width.NONE,
146+
TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_CONFIG
99147
)
100148
.define(
101149
TASKS_FILE_STATUS_STORAGE_NAME_CONFIG,
102150
ConfigDef.Type.STRING,
103151
ConfigDef.Importance.HIGH,
104-
TASKS_FILE_STATUS_STORAGE_NAME_DOC
152+
TASKS_FILE_STATUS_STORAGE_NAME_DOC,
153+
GROUP,
154+
groupCounter++,
155+
ConfigDef.Width.NONE,
156+
TASKS_FILE_STATUS_STORAGE_NAME_CONFIG
105157
)
106158
.define(
107159
TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG,
108160
ConfigDef.Type.BOOLEAN,
109161
true,
110162
ConfigDef.Importance.HIGH,
111-
TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_DOC
163+
TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_DOC,
164+
GROUP,
165+
groupCounter++,
166+
ConfigDef.Width.NONE,
167+
TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG
112168
);
113169

114170
}

examples/connect-file-pulse-example-override-topic-and-key.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@
2121
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
2222
"tasks.file.status.storage.bootstrap.servers": "broker:29092",
2323
"tasks.file.status.storage.topic": "connect-file-pulse-status",
24+
"tasks.file.status.storage.topic.partitions": 10,
25+
"tasks.file.status.storage.topic.replication.factor": 1,
2426
"tasks.max": 1
2527
}

examples/connect-file-pulse-quickstart-avro.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@
1313
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
1414
"tasks.file.status.storage.bootstrap.servers": "broker:29092",
1515
"tasks.file.status.storage.topic": "connect-file-pulse-status",
16+
"tasks.file.status.storage.topic.partitions": 10,
17+
"tasks.file.status.storage.topic.replication.factor": 1,
1618
"tasks.max": 1
1719
}

examples/connect-file-pulse-quickstart-csv.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@
2020
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
2121
"tasks.file.status.storage.bootstrap.servers": "broker:29092",
2222
"tasks.file.status.storage.topic": "connect-file-pulse-status",
23+
"tasks.file.status.storage.topic.partitions": 10,
24+
"tasks.file.status.storage.topic.replication.factor": 1,
2325
"tasks.max": 1
2426
}

examples/connect-file-pulse-quickstart-log4j.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,7 @@
2222
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
2323
"tasks.file.status.storage.bootstrap.servers": "broker:29092",
2424
"tasks.file.status.storage.topic": "connect-file-pulse-status",
25+
"tasks.file.status.storage.topic.partitions": 10,
26+
"tasks.file.status.storage.topic.replication.factor": 1,
2527
"tasks.max": 1
2628
}

0 commit comments

Comments
 (0)