configs) {
+ // Default: no-op
+ }
+
+ /**
+ * Check if the file should be marked as completed based on the strategy.
+ *
+ * @param context the file object context.
+ * @return true if the file should be marked as COMPLETED, false otherwise.
+ */
+ boolean shouldComplete(final FileObjectContext context);
+}
\ No newline at end of file
diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStatus.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStatus.java
index ad4ab83a2..2a78c5f77 100644
--- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStatus.java
+++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStatus.java
@@ -32,6 +32,11 @@ public enum FileObjectStatus {
*/
READING,
+ /**
+ * The file processing is partially completed (e.g. for long-lived processing).
+ */
+ PARTIALLY_COMPLETED,
+
/**
* The file processing is completed.
*/
@@ -59,4 +64,4 @@ public boolean isOneOf(final FileObjectStatus...states) {
public boolean isDone() {
return isOneOf(COMMITTED, FAILED, CLEANED);
}
-}
+}
\ No newline at end of file
diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/LongLivedFileReadStrategy.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/LongLivedFileReadStrategy.java
new file mode 100644
index 000000000..8db45b858
--- /dev/null
+++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/LongLivedFileReadStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.source;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Optional strategy interface that can be implemented alongside {@link FileCompletionStrategy}
+ * to influence when the connector should attempt to read from continuously appended files.
+ *
+ * This is intended for long-lived, append-only files (for example, daily log files) where
+ * it may be desirable to back off read attempts when no new data is expected for some time,
+ * in order to avoid unnecessary polling or timeouts while still allowing the completion
+ * strategy to control when the file is eventually marked as COMPLETED.
+ */
+public interface LongLivedFileReadStrategy {
+
+ Logger LOG = LoggerFactory.getLogger(LongLivedFileReadStrategy.class);
+
+ /**
+ * Determines whether the connector should attempt to read from the given file
+ * based on its current context.
+ *
+ *
The default implementation checks if the file has been modified since
+ * the last read offset timestamp.
+ *
+ * @param objectMeta the file object metadata.
+ * @param offset the last read offset for the file.
+ * @return true if the connector should attempt to read from the file, false otherwise.
+ */
+ default boolean shouldAttemptRead(final FileObjectMeta objectMeta, final FileObjectOffset offset) {
+ return objectMeta.lastModified() > offset.timestamp();
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/StateListener.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/StateListener.java
index f73bf92e3..deaa0516e 100644
--- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/StateListener.java
+++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/StateListener.java
@@ -40,6 +40,14 @@ public interface StateListener {
*/
void onCompleted(final FileObjectContext context);
+ /**
+ * This method is invoked when a source file processing is partially completed.
+ * @see FileRecordsPollingConsumer
+ *
+ * @param context the file context.
+ */
+ void onPartiallyCompleted(final FileObjectContext context);
+
/**
* This method is invoked when an error occurred while processing a source file.
* @see FileRecordsPollingConsumer
@@ -47,4 +55,4 @@ public interface StateListener {
* @param context the file context.
*/
void onFailure(final FileObjectContext context, final Throwable t);
-}
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/NonBlockingBufferReader.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/NonBlockingBufferReader.java
index 85594cb60..4a6f11e74 100644
--- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/NonBlockingBufferReader.java
+++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/NonBlockingBufferReader.java
@@ -15,6 +15,7 @@
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ private boolean fillWithBufferedLinesUntil(final List records,
TextBlock line;
do {
line = tryToExtractLine();
- if (line != null) {
+ if (line != null && !StringUtils.isEmpty(line.data())) {
records.add(line);
}
maxNumRecordsNotReached = records.size() < minRecords;
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java
index 7df54d19a..c458ab6b6 100644
--- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java
@@ -14,6 +14,8 @@
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
import io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner;
+import io.streamthoughts.kafka.connect.filepulse.source.EofCompletionStrategy;
+import io.streamthoughts.kafka.connect.filepulse.source.FileCompletionStrategy;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore;
@@ -42,6 +44,12 @@ public class CommonSourceConfig extends AbstractConfig {
public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters";
private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files.";
+ public static final String FS_COMPLETION_STRATEGY_CLASS_CONFIG = "fs.completion.strategy.class";
+ private static final String FS_COMPLETION_STRATEGY_CLASS_DOC =
+ "The FileCompletionStrategy class to determine when files should be marked as COMPLETED. " +
+ "Default is EofCompletionStrategy (completes when fully read). " +
+ "Use DailyCompletionStrategy for time-based completion (e.g., daily files).";
+
public static final String TASKS_FILE_READER_CLASS_CONFIG = "tasks.reader.class";
private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file.";
@@ -261,6 +269,13 @@ public static ConfigDef getConfigDef() {
groupCounter++,
ConfigDef.Width.NONE,
CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG
+ )
+ .define(
+ FS_COMPLETION_STRATEGY_CLASS_CONFIG,
+ ConfigDef.Type.CLASS,
+ EofCompletionStrategy.class,
+ ConfigDef.Importance.MEDIUM,
+ FS_COMPLETION_STRATEGY_CLASS_DOC
);
}
@@ -303,6 +318,10 @@ public String getValueSchemaConditionTopicPattern() {
return getString(RECORD_VALUE_SCHEMA_CONDITION_TOPIC_PATTERN_CONFIG);
}
+ public FileCompletionStrategy getFileCompletionStrategy() {
+ return getConfiguredInstance(FS_COMPLETION_STRATEGY_CLASS_CONFIG, FileCompletionStrategy.class);
+ }
+
public Schema getValueConnectSchema() {
String valueConnectSchemaTypeString = getString(RECORD_VALUE_SCHEMA_TYPE_CONFIG);
ConnectSchemaType schemaType = ConnectSchemaType.getForNameIgnoreCase(valueConnectSchemaTypeString);
@@ -352,4 +371,4 @@ private Schema readConfigSchema() {
"Failed to read connect-schema for '" + CommonSourceConfig.RECORD_VALUE_SCHEMA_CONFIG + "'", e);
}
}
-}
+}
\ No newline at end of file
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/DailyCompletionStrategyConfig.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/DailyCompletionStrategyConfig.java
new file mode 100644
index 000000000..12bc7a172
--- /dev/null
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/DailyCompletionStrategyConfig.java
@@ -0,0 +1,207 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.config;
+
+import io.streamthoughts.kafka.connect.filepulse.source.DailyCompletionStrategy;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+/**
+ * Configuration for {@link DailyCompletionStrategy}.
+ */
+public class DailyCompletionStrategyConfig extends AbstractConfig {
+
+ public static final String COMPLETION_SCHEDULE_TIME_CONFIG = "daily.completion.schedule.time";
+ private static final String COMPLETION_SCHEDULE_TIME_DOC =
+ "Time to complete files in HH:mm:ss format (e.g., '00:01:00'). " +
+ "Files will be marked as COMPLETED when the current time passes this scheduled time " +
+ "for the date extracted from the filename. Uses the system default timezone.";
+ private static final String COMPLETION_SCHEDULE_TIME_DEFAULT = "00:01:00";
+
+ public static final String COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG = "daily.completion.schedule.date.pattern";
+ private static final String COMPLETION_SCHEDULE_DATE_PATTERN_DOC =
+ "Regex pattern to extract date from filename. The pattern should contain capturing groups " +
+ "that match the date components. For example: '.*?(\\d{4}-\\d{2}-\\d{2}).*' to match dates " +
+ "like '2025-12-08' in filenames like 'logs-2025-12-08.log'.";
+ private static final String COMPLETION_SCHEDULE_DATE_PATTERN_DEFAULT = ".*?(\\d{4}-\\d{2}-\\d{2}).*";
+
+ public static final String COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG = "daily.completion.schedule.date.format";
+ private static final String COMPLETION_SCHEDULE_DATE_FORMAT_DOC =
+ "Date format pattern used to parse the date extracted from the filename. " +
+ "Must match the date format in the filename (e.g., 'yyyy-MM-dd' for '2025-12-08', " +
+ "'yyyyMMdd' for '20251208'). See Java DateTimeFormatter for supported patterns.";
+ private static final String COMPLETION_SCHEDULE_DATE_FORMAT_DEFAULT = "yyyy-MM-dd";
+
+ /**
+ * Creates a new {@link DailyCompletionStrategyConfig} instance.
+ *
+ * @param originals the configuration properties
+ */
+ public DailyCompletionStrategyConfig(final Map, ?> originals) {
+ super(configDef(), originals);
+ }
+
+ /**
+ * Get the scheduled completion time.
+ *
+ * @return the time at which files should be completed
+ */
+ public LocalTime scheduledCompletionTime() {
+ String timeStr = getString(COMPLETION_SCHEDULE_TIME_CONFIG);
+ try {
+ return LocalTime.parse(timeStr, DateTimeFormatter.ofPattern("HH:mm:ss"));
+ } catch (DateTimeParseException e) {
+ throw new ConfigException(
+ COMPLETION_SCHEDULE_TIME_CONFIG,
+ timeStr,
+ "Invalid time format. Expected format: HH:mm:ss (e.g., '23:59:59')"
+ );
+ }
+ }
+
+ /**
+ * Get the compiled regex pattern for extracting dates from filenames.
+ *
+ * @return the compiled pattern
+ */
+ public Pattern datePattern() {
+ String patternStr = getString(COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG);
+ try {
+ return Pattern.compile(patternStr);
+ } catch (PatternSyntaxException e) {
+ throw new ConfigException(
+ COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG,
+ patternStr,
+ "Invalid regex pattern: " + e.getMessage()
+ );
+ }
+ }
+
+ /**
+ * Get the date formatter for parsing dates from filenames.
+ *
+ * @return the date formatter
+ */
+ public DateTimeFormatter dateFormatter() {
+ String formatStr = getString(COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG);
+ try {
+ return DateTimeFormatter.ofPattern(formatStr);
+ } catch (IllegalArgumentException e) {
+ throw new ConfigException(
+ COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG,
+ formatStr,
+ "Invalid date format pattern: " + e.getMessage()
+ );
+ }
+ }
+
+ /**
+ * Define the configuration.
+ *
+ * @return the configuration definition
+ */
+ public static ConfigDef configDef() {
+ return new ConfigDef()
+ .define(
+ COMPLETION_SCHEDULE_TIME_CONFIG,
+ ConfigDef.Type.STRING,
+ COMPLETION_SCHEDULE_TIME_DEFAULT,
+ new TimeValidator(),
+ ConfigDef.Importance.HIGH,
+ COMPLETION_SCHEDULE_TIME_DOC
+ )
+ .define(
+ COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG,
+ ConfigDef.Type.STRING,
+ COMPLETION_SCHEDULE_DATE_PATTERN_DEFAULT,
+ new RegexValidator(),
+ ConfigDef.Importance.HIGH,
+ COMPLETION_SCHEDULE_DATE_PATTERN_DOC
+ )
+ .define(
+ COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG,
+ ConfigDef.Type.STRING,
+ COMPLETION_SCHEDULE_DATE_FORMAT_DEFAULT,
+ new DateFormatValidator(),
+ ConfigDef.Importance.HIGH,
+ COMPLETION_SCHEDULE_DATE_FORMAT_DOC
+ );
+ }
+
+ /**
+ * Validator for time format (HH:mm:ss).
+ */
+ private static class TimeValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ throw new ConfigException(name, value, "Time configuration is required");
+ }
+ String timeStr = value.toString();
+ try {
+ LocalTime.parse(timeStr, DateTimeFormatter.ofPattern("HH:mm:ss"));
+ } catch (DateTimeParseException e) {
+ throw new ConfigException(
+ name,
+ value,
+ "Invalid time format. Expected format: HH:mm:ss (e.g., '23:59:59')"
+ );
+ }
+ }
+ }
+
+ /**
+ * Validator for regex pattern.
+ */
+ private static class RegexValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ return; // Has default value
+ }
+ String patternStr = value.toString();
+ try {
+ Pattern.compile(patternStr);
+ } catch (PatternSyntaxException e) {
+ throw new ConfigException(
+ name,
+ value,
+ "Invalid regex pattern: " + e.getMessage()
+ );
+ }
+ }
+ }
+
+ /**
+ * Validator for date format pattern.
+ */
+ private static class DateFormatValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ return; // Has default value
+ }
+ String formatStr = value.toString();
+ try {
+ DateTimeFormatter.ofPattern(formatStr);
+ } catch (IllegalArgumentException e) {
+ throw new ConfigException(
+ name,
+ value,
+ "Invalid date format pattern. Use a valid DateTimeFormatter pattern (e.g., 'yyyy-MM-dd')"
+ );
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java
index bc92516a8..012dce6f6 100644
--- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java
@@ -12,10 +12,12 @@
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResult;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResultSet;
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
+import io.streamthoughts.kafka.connect.filepulse.source.FileCompletionStrategy;
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectKey;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
+import io.streamthoughts.kafka.connect.filepulse.source.LongLivedFileReadStrategy;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
@@ -34,6 +36,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.slf4j.Logger;
@@ -69,6 +72,8 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
private final BatchFileCleanupPolicy cleaner;
+ private final FileCompletionStrategy completionStrategy;
+
private final Long allowTasksReconfigurationAfterTimeoutMs;
private Long nextAllowedTasksReconfiguration = -1L;
@@ -95,8 +100,9 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
* @param allowTasksReconfigurationAfterTimeoutMs {@code true} to allow tasks reconfiguration after a timeout.
* @param fsListening the {@link FileSystemListing} to be used for listing object files.
* @param cleanPolicy the {@link GenericFileCleanupPolicy} to be used for cleaning object files.
- * @param offsetPolicy the {@link SourceOffsetPolicy} to be used computing offset for object fileS.
+ * @param offsetPolicy the {@link SourceOffsetPolicy} to be used computing offset for object files.
* @param store the {@link StateBackingStore} used for storing object file cursor.
+ * @param completionStrategy the {@link FileCompletionStrategy} to be used for determining file completion.
*/
public DefaultFileSystemMonitor(final Long allowTasksReconfigurationAfterTimeoutMs,
final FileSystemListing> fsListening,
@@ -104,7 +110,8 @@ public DefaultFileSystemMonitor(final Long allowTasksReconfigurationAfterTimeout
final Predicate cleanablePredicate,
final SourceOffsetPolicy offsetPolicy,
final StateBackingStore store,
- final TaskFileOrder taskFileOrder) {
+ final TaskFileOrder taskFileOrder,
+ final FileCompletionStrategy completionStrategy) {
Objects.requireNonNull(fsListening, "'fsListening' should not be null");
Objects.requireNonNull(cleanPolicy, "'cleanPolicy' should not be null");
Objects.requireNonNull(offsetPolicy, "'offsetPolicy' should not be null");
@@ -116,6 +123,7 @@ public DefaultFileSystemMonitor(final Long allowTasksReconfigurationAfterTimeout
this.allowTasksReconfigurationAfterTimeoutMs = allowTasksReconfigurationAfterTimeoutMs;
this.cleanablePredicate = cleanablePredicate;
this.taskFileOrder = taskFileOrder;
+ this.completionStrategy = completionStrategy;
if (cleanPolicy instanceof FileCleanupPolicy) {
this.cleaner = new DelegateBatchFileCleanupPolicy((FileCleanupPolicy) cleanPolicy);
@@ -148,7 +156,8 @@ public void onStateUpdate(final String key, final FileObject object) {
if (scanned.remove(objectId) != null) {
changed.set(true);
}
- } else if (status.isOneOf(FileObjectStatus.CLEANED, FileObjectStatus.INVALID)) {
+ } else if (status.isOneOf(FileObjectStatus.CLEANED, FileObjectStatus.INVALID,
+ FileObjectStatus.PARTIALLY_COMPLETED)) {
final FileObjectMeta removed = scheduled.remove(objectId);
if (removed == null && status.isOneOf(FileObjectStatus.CLEANED)) {
LOG.debug(
@@ -272,8 +281,8 @@ private synchronized boolean updateFiles() {
final boolean noScheduledFiles = scheduled.isEmpty();
if (!noScheduledFiles && allowTasksReconfigurationAfterTimeoutMs == Long.MAX_VALUE) {
LOG.info(
- "Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion",
- scheduled.size()
+ "Scheduled files still being processed: {}. Skip filesystem listing while waiting for tasks completion",
+ scheduled.size()
);
return false;
}
@@ -291,18 +300,29 @@ private synchronized boolean updateFiles() {
LOG.info("Completed object files listing. '{}' object files found in {}ms", objects.size(), took);
final StateSnapshot snapshot = store.snapshot();
- final Map toScheduled = FileObjectCandidatesFilter.filter(
- offsetPolicy,
- fileObjectKey -> {
- final FileObject fileObject = snapshot.getForKey(fileObjectKey.original());
- if (fileObject == null) return true;
-
- final FileObjectStatus status = fileObject.status();
- return !(cleanablePredicate.test(status) || status.isDone());
- },
- objects
+ Map toScheduled = FileObjectCandidatesFilter.filter(
+ offsetPolicy,
+ fileObjectKey -> {
+ final FileObject fileObject = snapshot.getForKey(fileObjectKey.original());
+ if (fileObject == null) return true;
+
+ final FileObjectStatus status = fileObject.status();
+ return !(cleanablePredicate.test(status) || status.isDone());
+ },
+ objects
);
+ // If long-lived file read strategy is used, filter out files that should not be attempted to read.
+ if (completionStrategy instanceof LongLivedFileReadStrategy) {
+ toScheduled = toScheduled.entrySet().stream().filter(entry -> {
+ final FileObject fileObject = snapshot.getForKey(entry.getKey().original());
+ if (fileObject == null) return true;
+
+ return ((LongLivedFileReadStrategy) completionStrategy).
+ shouldAttemptRead(entry.getValue(), fileObject.offset());
+ }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
// Some scheduled files are still being processed, but new files are detected
if (!noScheduledFiles) {
if (scheduled.keySet().containsAll(toScheduled.keySet())) {
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DailyCompletionStrategy.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DailyCompletionStrategy.java
new file mode 100644
index 000000000..45315e16b
--- /dev/null
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DailyCompletionStrategy.java
@@ -0,0 +1,186 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.source;
+
+import io.streamthoughts.kafka.connect.filepulse.config.DailyCompletionStrategyConfig;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link FileCompletionStrategy} that marks files as COMPLETED
+ * at a scheduled time by extracting the date from the filename.
+ *
+ * This is useful for "daily" files that are continuously appended throughout the day
+ * and should only be marked as complete the next day at a specific time.
+ *
+ *
The strategy extracts the date from the filename using a regex pattern. The file is completed
+ * on the day after the date in the filename at the configured completion time.
+ * Uses the system default timezone.
+ *
+ *
Example
+ *
+ * - File:
logs-2025-12-08.log
+ * - Completion time:
01:00:00
+ * - File created: December 8, 2025 at 6:00 AM
+ * - File will be completed: December 9, 2025 at 01:00:00
+ *
+ *
+ * This ensures that all data written during the day (December 8) is collected,
+ * with a 1-hour buffer into the next day before the file is marked complete.
+ *
+ *
Configuration
+ *
+ * completion.schedule.time: Time to complete files (HH:mm:ss format, e.g., "01:00:00")
+ * completion.schedule.date.pattern:
+ * Regex pattern to extract date from filename (default: ".*?(\\d{4}-\\d{2}-\\d{2}).*")
+ * completion.schedule.date.format: Date format in the filename (default: "yyyy-MM-dd")
+ *
+ *
+ * Example filename patterns
+ *
+ * logs-2025-12-08.log → pattern: ".*?(\\d{4}-\\d{2}-\\d{2}).*", format: "yyyy-MM-dd"
+ * app-20251208.log → pattern: ".*?(\\d{8}).*", format: "yyyyMMdd"
+ *
+ */
+public class DailyCompletionStrategy implements FileCompletionStrategy, LongLivedFileReadStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DailyCompletionStrategy.class);
+
+ private LocalTime scheduledCompletionTime;
+ private Pattern datePattern;
+ private DateTimeFormatter dateFormatter;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void configure(final Map configs) {
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(configs);
+ this.scheduledCompletionTime = config.scheduledCompletionTime();
+ this.datePattern = config.datePattern();
+ this.dateFormatter = config.dateFormatter();
+
+ LOG.info(
+ "Configured DailyCompletionStrategy: completionTime={}, datePattern={}, dateFormat={}, timezone={}",
+ scheduledCompletionTime, datePattern.pattern(), dateFormatter, ZoneId.systemDefault()
+ );
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean shouldComplete(final FileObjectContext context) {
+ // Extract date from filename
+ LocalDate fileDate;
+ try {
+ fileDate = extractDateFromFilename(context.metadata().stringURI());
+ } catch (Exception e) {
+ LOG.warn(
+ "Could not extract date from filename '{}': {}. " +
+ "File will not be completed until date can be determined.",
+ context.metadata().stringURI(),
+ e.getMessage()
+ );
+ // If we can't extract the date, don't complete the file
+ return false;
+ }
+
+ // Calculate when this file should be completed
+ Instant fileCompletionTime = calculateCompletionTimeForDate(fileDate);
+ Instant now = Instant.now();
+
+ boolean timeReached = now.isAfter(fileCompletionTime) || now.equals(fileCompletionTime);
+
+ if (timeReached) {
+ LOG.info(
+ "Scheduled completion time reached for file '{}' (date: {}, completion time: {}, now: {})",
+ context.metadata().stringURI(),
+ fileDate,
+ fileCompletionTime,
+ now
+ );
+ }
+
+ return timeReached;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean shouldAttemptRead(final FileObjectMeta objectMeta, final FileObjectOffset offset) {
+ // Always attempt to read if the file should be completed
+ boolean shouldRead = shouldComplete(new FileObjectContext(objectMeta))
+ || LongLivedFileReadStrategy.super.shouldAttemptRead(objectMeta, offset);
+
+ if (!shouldRead) {
+ LOG.debug("Deferring read for file '{}' until file is updated or scheduled completion time is reached.",
+ objectMeta.stringURI());
+ }
+
+ return shouldRead;
+ }
+
+ /**
+ * Extract the date from the filename using the configured pattern and format.
+ *
+ * @param filename the filename or URI
+ * @return the parsed date
+ * @throws IllegalArgumentException if the date cannot be extracted or parsed
+ */
+ private LocalDate extractDateFromFilename(String filename) {
+ Matcher matcher = datePattern.matcher(filename);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException(
+ "Filename does not match date pattern: " + filename
+ );
+ }
+
+ // Extract the date string from the first capturing group
+ String dateStr = matcher.group(1);
+
+ try {
+ return LocalDate.parse(dateStr, dateFormatter);
+ } catch (DateTimeParseException e) {
+ throw new IllegalArgumentException(
+ "Could not parse date '" + dateStr + "' from filename '" +
+ filename + "' using format: " + dateFormatter,
+ e
+ );
+ }
+ }
+
+ /**
+ * Calculate the completion time for a given file date using the system default timezone.
+ * The completion time is the configured time on the day after the file's date.
+ *
+ * For example, if the file is for 2025-12-08 and completion time is 01:00:00,
+ * the file should be completed at 2025-12-09 01:00:00 (in the system timezone).
+ *
+ *
This ensures that all data written during the file's day (Dec 8) is collected,
+ * with a buffer period into the next day before marking the file as complete.
+ *
+ * @param fileDate the date extracted from the filename
+ * @return the instant when the file should be completed
+ */
+ private Instant calculateCompletionTimeForDate(LocalDate fileDate) {
+ LocalDate completionDate = fileDate.plusDays(1);
+ LocalDateTime completionDateTime = LocalDateTime.of(completionDate, scheduledCompletionTime);
+ return completionDateTime.atZone(ZoneId.systemDefault()).toInstant();
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.java
index eb7713e78..6becbf67b 100644
--- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.java
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.java
@@ -37,6 +37,7 @@ public class DefaultFileRecordsPollingConsumer implements FileRecordsPollingCons
private StateListener listener;
private final SourceTaskContext taskContext;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final FileCompletionStrategy completionStrategy;
private FileRecord latestPolledRecord;
@@ -50,18 +51,21 @@ public class DefaultFileRecordsPollingConsumer implements FileRecordsPollingCons
* @param pipeline the filter pipeline to apply on each record.
* @param offsetPolicy the source offset/partition policy.
* @param ignoreCommittedOffsets flag to indicate if committed offsets should be ignored.
+ * @param completionStrategy the strategy to determine when files should be marked as COMPLETED.
*/
DefaultFileRecordsPollingConsumer(final SourceTaskContext taskContext,
final FileInputReader reader,
final RecordFilterPipeline> pipeline,
final SourceOffsetPolicy offsetPolicy,
- final boolean ignoreCommittedOffsets) {
+ final boolean ignoreCommittedOffsets,
+ final FileCompletionStrategy completionStrategy) {
this.queue = new LinkedBlockingQueue<>();
this.ignoreCommittedOffsets = ignoreCommittedOffsets;
this.reader = reader;
this.pipeline = pipeline;
this.offsetPolicy = offsetPolicy;
this.taskContext = taskContext;
+ this.completionStrategy = completionStrategy;
}
void addAll(final List files) {
@@ -372,7 +376,12 @@ private void deleteFileQueueAndInvokeListener(final FileObjectContext fileContex
if (exception != null) {
listener.onFailure(fileContext, exception);
} else {
- listener.onCompleted(fileContext);
+ // Delegate to the completion strategy to decide if the file should be completed
+ if (completionStrategy.shouldComplete(fileContext)) {
+ listener.onCompleted(fileContext);
+ } else {
+ listener.onPartiallyCompleted(fileContext);
+ }
}
}
}
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/EofCompletionStrategy.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/EofCompletionStrategy.java
new file mode 100644
index 000000000..192d362ef
--- /dev/null
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/EofCompletionStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.source;
+
+/**
+ * A {@link FileCompletionStrategy} that marks files as COMPLETED
+ * immediately when they are fully read.
+ * This is the default behavior and maintains backward compatibility.
+ */
+public class EofCompletionStrategy implements FileCompletionStrategy {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean shouldComplete(final FileObjectContext context) {
+ // Complete immediately when the file is fully read
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStateReporter.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStateReporter.java
index b340f95d9..1f421bf85 100644
--- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStateReporter.java
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStateReporter.java
@@ -100,6 +100,16 @@ public void onCompleted(final FileObjectContext context) {
notify(context, FileObjectStatus.COMPLETED);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onPartiallyCompleted(final FileObjectContext context) {
+ Objects.requireNonNull(context, "context can't be null");
+ LOG.debug("Partially completed object-file: '{}'", context.metadata());
+ notify(context, FileObjectStatus.PARTIALLY_COMPLETED);
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java
index 20e4cb274..0a93002d3 100644
--- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java
@@ -119,6 +119,10 @@ private FileSystemMonitor createFileSystemMonitor(final SourceConnectorConfig co
final FileSystemListing> fileSystemListing = connectorConfig.getFileSystemListing();
fileSystemListing.setFilter(new CompositeFileListFilter(connectorConfig.getFileSystemListingFilter()));
+ // Get and configure the completion strategy
+ final FileCompletionStrategy completionStrategy = connectorConfig.getFileCompletionStrategy();
+ completionStrategy.configure(connectorConfig.originalsStrings());
+
DefaultFileSystemMonitor monitor = new DefaultFileSystemMonitor(
connectorConfig.allowTasksReconfigurationAfterTimeoutMs(),
fileSystemListing,
@@ -126,7 +130,8 @@ private FileSystemMonitor createFileSystemMonitor(final SourceConnectorConfig co
connectorConfig.getFsCleanupPolicyPredicate(),
connectorConfig.getSourceOffsetPolicy(),
store,
- connectorConfig.getTaskFilerOrder()
+ connectorConfig.getTaskFilerOrder(),
+ completionStrategy
);
monitor.setStateDefaultReadTimeout(connectorConfig.getStateDefaultReadTimeoutMs());
diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java
index 7de9c0729..b2dacf750 100644
--- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java
+++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java
@@ -151,12 +151,18 @@ private DefaultFileRecordsPollingConsumer newFileRecordsPollingConsumer() {
final RecordFilterPipeline> filter = new DefaultRecordFilterPipeline(
taskConfig.filters()
);
+
+ // Get and configure the completion strategy
+ final FileCompletionStrategy completionStrategy = taskConfig.getFileCompletionStrategy();
+ completionStrategy.configure(taskConfig.originalsStrings());
+
return new DefaultFileRecordsPollingConsumer(
context,
taskConfig.reader(),
filter,
offsetPolicy,
- taskConfig.isReadCommittedFile());
+ taskConfig.isReadCommittedFile(),
+ completionStrategy);
}
/**
diff --git a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/config/DailyCompletionStrategyConfigTest.java b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/config/DailyCompletionStrategyConfigTest.java
new file mode 100644
index 000000000..2edb14824
--- /dev/null
+++ b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/config/DailyCompletionStrategyConfigTest.java
@@ -0,0 +1,311 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+/**
+ * Test class for {@link DailyCompletionStrategyConfig}.
+ */
+public class DailyCompletionStrategyConfigTest {
+
+ @Test
+ public void testValidConfiguration() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, ".*?(\\d{4}-\\d{2}-\\d{2}).*");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "yyyy-MM-dd");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertNotNull(config);
+ assertEquals(LocalTime.of(1, 0, 0), config.scheduledCompletionTime());
+ assertEquals(".*?(\\d{4}-\\d{2}-\\d{2}).*", config.datePattern().pattern());
+ assertNotNull(config.dateFormatter());
+ }
+
+ @Test
+ public void testConfigurationWithDefaults() {
+ Map props = new HashMap<>();
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertNotNull(config);
+ assertEquals(LocalTime.of(0, 1, 0), config.scheduledCompletionTime());
+ // Should use default pattern and format
+ assertNotNull(config.datePattern());
+ assertNotNull(config.dateFormatter());
+ }
+
+ @Test
+ public void testValidTimeFormats() {
+ String[] validTimes = {
+ "00:00:00",
+ "01:00:00",
+ "12:00:00",
+ "23:59:59",
+ "06:30:15"
+ };
+
+ for (String time : validTimes) {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, time);
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ LocalTime parsed = config.scheduledCompletionTime();
+ assertNotNull("Time should be parsed: " + time, parsed);
+ }
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidTimeFormat_NoSeconds() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.scheduledCompletionTime();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidTimeFormat_InvalidHour() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "25:00:00");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.scheduledCompletionTime();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidTimeFormat_InvalidMinute() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "12:60:00");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.scheduledCompletionTime();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidTimeFormat_InvalidSecond() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "12:00:60");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.scheduledCompletionTime();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidTimeFormat_InvalidString() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "not-a-time");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.scheduledCompletionTime();
+ }
+
+ @Test
+ public void testValidPatterns() {
+ String[] validPatterns = {
+ ".*?(\\d{4}-\\d{2}-\\d{2}).*",
+ ".*?(\\d{8}).*",
+ "prefix-(\\d{4})-(\\d{2})-(\\d{2})-suffix",
+ "(\\d{4})/(\\d{2})/(\\d{2})",
+ ".*"
+ };
+
+ for (String pattern : validPatterns) {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, pattern);
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ Pattern compiled = config.datePattern();
+ assertNotNull("Pattern should be compiled: " + pattern, compiled);
+ assertEquals(pattern, compiled.pattern());
+ }
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidPattern_UnclosedGroup() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, "(\\d{4}-\\d{2}-\\d{2}");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.datePattern();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidPattern_InvalidRegex() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, "[invalid(regex");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.datePattern();
+ }
+
+ @Test
+ public void testValidDateFormats() {
+ String[] validFormats = {
+ "yyyy-MM-dd",
+ "yyyyMMdd",
+ "yyyy/MM/dd",
+ "dd-MM-yyyy",
+ "MM/dd/yyyy",
+ "yyyy.MM.dd",
+ "yyyy-MM"
+ };
+
+ for (String format : validFormats) {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, format);
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ DateTimeFormatter formatter = config.dateFormatter();
+ assertNotNull("Formatter should be created: " + format, formatter);
+ }
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidDateFormat() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "invalid-format-xxx");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.dateFormatter();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidDateFormat_WrongLetters() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "yyyy-PP-dd");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.dateFormatter();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidDateFormat_UnknownLetter() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "xyz-abc");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.dateFormatter();
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidDateFormat_UnmatchedBracket() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "yyyy-MM-dd]");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+ config.dateFormatter();
+ }
+
+ // ========== Combined Configuration Tests ==========
+
+ @Test
+ public void testCompactDateConfiguration() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "02:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, ".*?(\\d{8}).*");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "yyyyMMdd");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertEquals(LocalTime.of(2, 0, 0), config.scheduledCompletionTime());
+ assertEquals(".*?(\\d{8}).*", config.datePattern().pattern());
+ assertNotNull(config.dateFormatter());
+ }
+
+ @Test
+ public void testSlashDateConfiguration() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "03:30:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, ".*?(\\d{4})/(\\d{2})/(\\d{2}).*");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "yyyy/MM/dd");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertEquals(LocalTime.of(3, 30, 0), config.scheduledCompletionTime());
+ assertEquals(".*?(\\d{4})/(\\d{2})/(\\d{2}).*", config.datePattern().pattern());
+ assertNotNull(config.dateFormatter());
+ }
+
+ @Test
+ public void testReverseDateConfiguration() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "23:59:59");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, ".*?(\\d{2})-(\\d{2})-(\\d{4}).*");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "dd-MM-yyyy");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertEquals(LocalTime.of(23, 59, 59), config.scheduledCompletionTime());
+ assertEquals(".*?(\\d{2})-(\\d{2})-(\\d{4}).*", config.datePattern().pattern());
+ assertNotNull(config.dateFormatter());
+ }
+
+ // ========== Edge Case Tests ==========
+
+ @Test
+ public void testMidnightTime() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "00:00:00");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertEquals(LocalTime.MIDNIGHT, config.scheduledCompletionTime());
+ }
+
+ @Test
+ public void testEndOfDayTime() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "23:59:59");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertEquals(LocalTime.of(23, 59, 59), config.scheduledCompletionTime());
+ }
+
+ @Test
+ public void testNoonTime() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "12:00:00");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertEquals(LocalTime.NOON, config.scheduledCompletionTime());
+ }
+
+ @Test
+ public void testMultipleCapturingGroups() {
+ Map props = new HashMap<>();
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_TIME_CONFIG, "01:00:00");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_PATTERN_CONFIG, ".*?(\\d{4})-(\\d{2})-(\\d{2}).*");
+ props.put(DailyCompletionStrategyConfig.COMPLETION_SCHEDULE_DATE_FORMAT_CONFIG, "yyyy-MM-dd");
+
+ DailyCompletionStrategyConfig config = new DailyCompletionStrategyConfig(props);
+
+ assertNotNull(config.datePattern());
+ assertNotNull(config.dateFormatter());
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitorTest.java b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitorTest.java
index 975a79ae3..3eba504c4 100644
--- a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitorTest.java
+++ b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitorTest.java
@@ -272,8 +272,8 @@ private DefaultFileSystemMonitor newFileSystemMonitor(final MockFileCleaner clea
status -> List.of(FileObjectStatus.FAILED, FileObjectStatus.COMPLETED).contains(status),
OFFSET_MANAGER,
store,
- TaskFileOrder.BuiltIn.LAST_MODIFIED.get()
- );
+ TaskFileOrder.BuiltIn.LAST_MODIFIED.get(),
+ null);
}
private static class NoOpFileSystemListing implements FileSystemListing {
diff --git a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/DailyCompletionStrategyTest.java b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/DailyCompletionStrategyTest.java
new file mode 100644
index 000000000..680c6b26e
--- /dev/null
+++ b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/DailyCompletionStrategyTest.java
@@ -0,0 +1,266 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright (c) StreamThoughts
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.streamthoughts.kafka.connect.filepulse.source;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for {@link DailyCompletionStrategy}.
+ */
+public class DailyCompletionStrategyTest {
+
+ private DailyCompletionStrategy strategy;
+ private static final String DEFAULT_TIME = "01:00:00";
+
+ @Before
+ public void setUp() {
+ strategy = new DailyCompletionStrategy();
+ }
+
+ /**
+ * Helper method to create configuration map.
+ */
+ private Map createConfig(String time, String pattern, String format) {
+ Map config = new HashMap<>();
+ config.put("daily.completion.schedule.time", time);
+ if (pattern != null) {
+ config.put("daily.completion.schedule.date.pattern", pattern);
+ }
+ if (format != null) {
+ config.put("daily.completion.schedule.date.format", format);
+ }
+ return config;
+ }
+
+ /**
+ * Helper method to create FileObjectContext with a given filename.
+ * Uses a fixed timestamp (2024-01-15 12:00:00 UTC) for last modified time.
+ */
+ private FileObjectContext createContext(String filename) {
+ long defaultTimestamp = 1705334400000L; // 2024-01-15 12:00:00 UTC
+ GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
+ .withUri(URI.create("file:///tmp/" + filename))
+ .withName(filename)
+ .withContentLength(1000L)
+ .withLastModified(defaultTimestamp)
+ .build();
+ return new FileObjectContext(meta);
+ }
+
+ @Test
+ public void testShouldCompleteWithStandardFilename() {
+ Map config = createConfig(DEFAULT_TIME, null, null);
+ strategy.configure(config);
+
+ // File from 10 days ago - should be complete
+ LocalDate tenDaysAgo = LocalDate.now().minusDays(10);
+ String filename = String.format("logs-%s.log", tenDaysAgo.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+ FileObjectContext context = createContext(filename);
+
+ assertTrue("Old file should be marked as complete",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldCompleteWithCompactDateFormat() {
+ Map config = createConfig("02:00:00", ".*?(\\d{8}).*", "yyyyMMdd");
+ strategy.configure(config);
+
+ // File from 10 days ago in compact format
+ LocalDate tenDaysAgo = LocalDate.now().minusDays(10);
+ String filename = String.format("app-%s.log", tenDaysAgo.format(DateTimeFormatter.ofPattern("yyyyMMdd")));
+ FileObjectContext context = createContext(filename);
+
+ // Old file should be complete
+ assertTrue("Old file with compact date should be complete",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldCompleteWithDateInMiddleOfFilename() {
+ Map config = createConfig(DEFAULT_TIME, null, null);
+ strategy.configure(config);
+
+ // File from 10 days ago with date in middle
+ LocalDate tenDaysAgo = LocalDate.now().minusDays(10);
+ String filename = String.format("application-%s-server.log", tenDaysAgo.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+ FileObjectContext context = createContext(filename);
+
+ assertTrue("Old file with date in middle should be complete",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldNotCompleteWhenDateCannotBeExtracted() {
+ Map config = createConfig(DEFAULT_TIME, null, null);
+ strategy.configure(config);
+
+ FileObjectContext context = createContext("no-date-in-filename.log");
+
+ assertFalse("Should not complete when date cannot be extracted",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldNotCompleteWhenDateFormatDoesNotMatch() {
+ Map config = createConfig(DEFAULT_TIME, ".*?(\\d{8}).*", "yyyyMMdd");
+ strategy.configure(config);
+
+ // Filename has yyyy-MM-dd format but config expects yyyyMMdd
+ LocalDate tenDaysAgo = LocalDate.now().minusDays(10);
+ String filename = String.format("logs-%s.log", tenDaysAgo.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+ FileObjectContext context = createContext(filename);
+
+ assertFalse("Should not complete when date format doesn't match",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldNotCompleteWhenPatternDoesNotMatch() {
+ Map config = createConfig(DEFAULT_TIME, "prefix-(\\d{4}-\\d{2}-\\d{2})-suffix", null);
+ strategy.configure(config);
+
+ // Filename doesn't match the strict pattern
+ LocalDate tenDaysAgo = LocalDate.now().minusDays(10);
+ String filename = String.format("logs-%s.log", tenDaysAgo.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+ FileObjectContext context = createContext(filename);
+
+ assertFalse("Should not complete when pattern doesn't match",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldCompleteForOldFile() {
+ Map config = createConfig("01:00:00", null, null);
+ strategy.configure(config);
+
+ // File from 10 days ago should definitely be complete
+ // (completion was 9 days ago at 01:00:00)
+ LocalDate tenDaysAgo = LocalDate.now().minusDays(10);
+ String filename = String.format("logs-%s.log", tenDaysAgo.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+ FileObjectContext context = createContext(filename);
+
+ assertTrue("Old file should be marked as complete",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldNotCompleteForFutureFile() {
+ Map config = createConfig("01:00:00", null, null);
+ strategy.configure(config);
+
+ // File from 5 days in the future
+ // (completion is 4 days in the future at 01:00:00)
+ LocalDate fiveDaysFromNow = LocalDate.now().plusDays(5);
+ String filename = String.format("logs-%s.log", fiveDaysFromNow.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+ FileObjectContext context = createContext(filename);
+
+ assertFalse("Future file should not be marked as complete",
+ strategy.shouldComplete(context));
+ }
+
+
+ @Test
+ public void testFileCompletesNextDayAtScheduledTime() {
+ Map config = createConfig("02:00:00", null, null);
+ strategy.configure(config);
+
+ // File from today with completion time 02:00:00
+ // Should complete tomorrow at 02:00:00
+ // Since we're testing today, it should NOT be complete yet
+ LocalDate today = LocalDate.now();
+ String filename = String.format("file-%s.log", today.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+ FileObjectContext context = createContext(filename);
+
+ assertFalse("Today's file should NOT be complete (completes tomorrow at 02:00:00)",
+ strategy.shouldComplete(context));
+ }
+
+ @Test
+ public void testShouldAttemptReadWhenFileIsComplete() {
+ Map config = createConfig("01:00:00", null, null);
+ strategy.configure(config);
+
+ // Old file from 10 days ago that should be complete
+ long fixedTime = 1705334400000L; // 2024-01-15 12:00:00 UTC
+ LocalDate tenDaysAgo = LocalDate.now().minusDays(10);
+ String filename = String.format("logs-%s.log", tenDaysAgo.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+
+ GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
+ .withUri(URI.create("file:///tmp/" + filename))
+ .withName(filename)
+ .withContentLength(1000L)
+ .withLastModified(fixedTime)
+ .build();
+
+ FileObjectOffset offset = new FileObjectOffset(0, 0, fixedTime);
+
+ assertTrue("Should attempt to read when file is complete",
+ strategy.shouldAttemptRead(meta, offset));
+ }
+
+ @Test
+ public void testShouldAttemptReadWhenFileIsModified() {
+ Map config = createConfig("23:59:59", null, null);
+ strategy.configure(config);
+
+ // File from 5 days in future - not complete yet but modified
+ long fileModifiedTime = 1705334400000L; // 2024-01-15 12:00:00 UTC
+ long offsetTime = 1705330800000L; // 2024-01-15 11:00:00 UTC (1 hour earlier)
+
+ LocalDate fiveDaysFromNow = LocalDate.now().plusDays(5);
+ String filename = String.format("logs-%s.log", fiveDaysFromNow.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+
+ GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
+ .withUri(URI.create("file:///tmp/" + filename))
+ .withName(filename)
+ .withContentLength(1000L)
+ .withLastModified(fileModifiedTime)
+ .build();
+
+ // Offset from 1 hour ago - file was modified after offset
+ FileObjectOffset offset = new FileObjectOffset(0, 0, offsetTime);
+
+ assertTrue("Should attempt to read when file is modified",
+ strategy.shouldAttemptRead(meta, offset));
+ }
+
+ @Test
+ public void testShouldNotAttemptReadWhenFileIsNotModifiedAndNotComplete() {
+ Map config = createConfig("23:59:59", null, null);
+ strategy.configure(config);
+
+ // File from 5 days in future - not complete yet and not modified
+ LocalDate fiveDaysFromNow = LocalDate.now().plusDays(5);
+ String filename = String.format("logs-%s.log", fiveDaysFromNow.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
+
+ long fileModifiedTime = 1705330800000L; // 2024-01-15 11:00:00 UTC
+ long offsetTime = 1705334400000L; // 2024-01-15 12:00:00 UTC (newer than file)
+
+ GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
+ .withUri(URI.create("file:///tmp/" + filename))
+ .withName(filename)
+ .withContentLength(1000L)
+ .withLastModified(fileModifiedTime)
+ .build();
+
+ // Offset is newer than last modification
+ FileObjectOffset offset = new FileObjectOffset(0, 0, offsetTime);
+
+ assertFalse("Should not attempt to read when file is not modified and not complete",
+ strategy.shouldAttemptRead(meta, offset));
+ }
+}
\ No newline at end of file