Skip to content

Commit 4ad472c

Browse files
committed
refactor(filesystem): remove local fs dependency from RowFileInputIteratorBuilder
1 parent 7313451 commit 4ad472c

File tree

3 files changed

+12
-6
lines changed

3 files changed

+12
-6
lines changed

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
2424
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
2525
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
import java.io.File;
2830
import java.nio.charset.Charset;
@@ -34,6 +36,8 @@
3436
*/
3537
public class RowFileInputIteratorBuilder {
3638

39+
private static final Logger LOG = LoggerFactory.getLogger(RowFileInputIteratorBuilder.class);
40+
3741
private Charset charset = StandardCharsets.UTF_8;
3842
private int minNumReadRecords = 1;
3943
private FileObjectMeta metadata;
@@ -91,15 +95,17 @@ public FileInputIterator<FileRecord<TypedStruct>> build() {
9195
.setMaxWaitMs(waitMaxMs);
9296

9397
if (skipFooters > 0) {
98+
LOG.debug("Decorate RowFileInputIterator with RowFileWithFooterInputIterator");
9499
iterator = new RowFileWithFooterInputIterator(
95100
skipFooters,
96-
new File(metadata.uri()),
101+
metadata.uri(),
97102
charset,
98103
iterator
99104
);
100105
}
101106

102107
if (skipHeaders > 0) {
108+
LOG.debug("Decorate RowFileInputIterator with RowFileWithHeadersInputIterator");
103109
iterator = new RowFileWithHeadersInputIterator(
104110
skipHeaders,
105111
readerSupplier,

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileWithFooterInputIterator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.LoggerFactory;
3030

3131
import java.io.File;
32+
import java.net.URI;
3233
import java.nio.charset.Charset;
3334
import java.util.Collections;
3435
import java.util.List;
@@ -61,12 +62,12 @@ public class RowFileWithFooterInputIterator extends RowFileInputIteratorDecorato
6162
private List<String> footersStrings;
6263

6364
public RowFileWithFooterInputIterator(final int skipFooters,
64-
final File file,
65+
final URI uri,
6566
final Charset charset,
6667
final FileInputIterator<FileRecord<TypedStruct>> iterator) {
6768
super(iterator);
6869
this.skipFooters = skipFooters;
69-
this.file = file;
70+
this.file = new File(uri);
7071
this.charset = charset;
7172
}
7273

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/internal/ReversedInputFileReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import java.util.List;
2727

2828
/**
29-
* ReversedInputFileReader is attend to be used to read a fixed number of lines from bottom of a file.
30-
*
29+
* ReversedInputFileReader is attended to be used to read a fixed number of lines from bottom of a file.
3130
* This class is not optimized for reading a whole file from bottom to top.
3231
*/
3332
public class ReversedInputFileReader implements AutoCloseable {
@@ -136,7 +135,7 @@ public List<TextBlock> readLines(int minRecords) throws IOException {
136135

137136
input.readFully(buffer, 0, nread);
138137

139-
// Reset buffer cursor to beginning before reading before.
138+
// Reset buffer cursor to beginning before reading.
140139
bufferOffset = 0;
141140
TextBlock line;
142141
do {

0 commit comments

Comments
 (0)