Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<suppress checks="NPathComplexity" files="DefaultFileRecordsPollingConsumer.java"/>
<suppress checks="NPathComplexity" files="DefaultFileSystemMonitor.java"/>
<suppress checks="ParameterNumber" files="InternalFilterContext" />
<suppress checks="ParameterNumber" files="DefaultFileSystemMonitor" />
<suppress checks="Header" files="kafka-connect-source-file-pulse-version.properties"/>
<suppress checks="Header" files=".*.properties"/>
<suppress checks="[a-zA-Z0-9]*" files="src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/antlr4/*"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.source;

import java.util.Map;

/**
* Strategy interface for determining when a file should be marked as COMPLETED.
*/
public interface FileCompletionStrategy {

/**
* Configure this strategy.
*
* @param configs the configuration properties.
*/
default void configure(final Map<String, ?> configs) {
// Default: no-op
}

/**
* Check if the file should be marked as completed based on the strategy.
*
* @param context the file object context.
* @return true if the file should be marked as COMPLETED, false otherwise.
*/
boolean shouldComplete(final FileObjectContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public enum FileObjectStatus {
*/
READING,

/**
* The file processing is partially completed (e.g. for long-lived processing).
*/
PARTIALLY_COMPLETED,

/**
* The file processing is completed.
*/
Expand Down Expand Up @@ -59,4 +64,4 @@ public boolean isOneOf(final FileObjectStatus...states) {
public boolean isDone() {
return isOneOf(COMMITTED, FAILED, CLEANED);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright (c) StreamThoughts
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.streamthoughts.kafka.connect.filepulse.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Optional strategy interface that can be implemented alongside {@link FileCompletionStrategy}
* to influence when the connector should attempt to read from continuously appended files.
*
* <p>This is intended for long-lived, append-only files (for example, daily log files) where
* it may be desirable to back off read attempts when no new data is expected for some time,
* in order to avoid unnecessary polling or timeouts while still allowing the completion
* strategy to control when the file is eventually marked as COMPLETED.
*/
public interface LongLivedFileReadStrategy {

Logger LOG = LoggerFactory.getLogger(LongLivedFileReadStrategy.class);

/**
* Determines whether the connector should attempt to read from the given file
* based on its current context.
*
* <p>The default implementation checks if the file has been modified since
* the last read offset timestamp.
*
* @param objectMeta the file object metadata.
* @param offset the last read offset for the file.
* @return true if the connector should attempt to read from the file, false otherwise.
*/
default boolean shouldAttemptRead(final FileObjectMeta objectMeta, final FileObjectOffset offset) {
return objectMeta.lastModified() > offset.timestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,19 @@ public interface StateListener {
*/
void onCompleted(final FileObjectContext context);

/**
* This method is invoked when a source file processing is partially completed.
* @see FileRecordsPollingConsumer
*
* @param context the file context.
*/
void onPartiallyCompleted(final FileObjectContext context);

/**
* This method is invoked when an error occurred while processing a source file.
* @see FileRecordsPollingConsumer
*
* @param context the file context.
*/
void onFailure(final FileObjectContext context, final Throwable t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -178,7 +179,7 @@ private boolean fillWithBufferedLinesUntil(final List<TextBlock> records,
TextBlock line;
do {
line = tryToExtractLine();
if (line != null) {
if (line != null && !StringUtils.isEmpty(line.data())) {
records.add(line);
}
maxNumRecordsNotReached = records.size() < minRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
import io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner;
import io.streamthoughts.kafka.connect.filepulse.source.EofCompletionStrategy;
import io.streamthoughts.kafka.connect.filepulse.source.FileCompletionStrategy;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore;
Expand Down Expand Up @@ -42,6 +44,12 @@ public class CommonSourceConfig extends AbstractConfig {
public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters";
private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files.";

public static final String FS_COMPLETION_STRATEGY_CLASS_CONFIG = "fs.completion.strategy.class";
private static final String FS_COMPLETION_STRATEGY_CLASS_DOC =
"The FileCompletionStrategy class to determine when files should be marked as COMPLETED. " +
"Default is EofCompletionStrategy (completes when fully read). " +
"Use DailyCompletionStrategy for time-based completion (e.g., daily files).";

public static final String TASKS_FILE_READER_CLASS_CONFIG = "tasks.reader.class";
private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file.";

Expand Down Expand Up @@ -261,6 +269,13 @@ public static ConfigDef getConfigDef() {
groupCounter++,
ConfigDef.Width.NONE,
CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG
)
.define(
FS_COMPLETION_STRATEGY_CLASS_CONFIG,
ConfigDef.Type.CLASS,
EofCompletionStrategy.class,
ConfigDef.Importance.MEDIUM,
FS_COMPLETION_STRATEGY_CLASS_DOC
);
}

Expand Down Expand Up @@ -303,6 +318,10 @@ public String getValueSchemaConditionTopicPattern() {
return getString(RECORD_VALUE_SCHEMA_CONDITION_TOPIC_PATTERN_CONFIG);
}

public FileCompletionStrategy getFileCompletionStrategy() {
return getConfiguredInstance(FS_COMPLETION_STRATEGY_CLASS_CONFIG, FileCompletionStrategy.class);
}

public Schema getValueConnectSchema() {
String valueConnectSchemaTypeString = getString(RECORD_VALUE_SCHEMA_TYPE_CONFIG);
ConnectSchemaType schemaType = ConnectSchemaType.getForNameIgnoreCase(valueConnectSchemaTypeString);
Expand Down Expand Up @@ -352,4 +371,4 @@ private Schema readConfigSchema() {
"Failed to read connect-schema for '" + CommonSourceConfig.RECORD_VALUE_SCHEMA_CONFIG + "'", e);
}
}
}
}
Loading