Skip to content

Commit 65ff494

Browse files
committed
feat(plugin): add reader to only send file metadata (#57)
1 parent f4965f9 commit 65ff494

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2019-2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.streamthoughts.kafka.connect.filepulse.reader;
21+
22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
23+
import io.streamthoughts.kafka.connect.filepulse.filter.FilterContext;
24+
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
25+
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
26+
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffset;
27+
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
28+
29+
import java.util.Collections;
30+
import java.util.Iterator;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
33+
/**
34+
* Send a single record containing file metadata.
35+
*/
36+
public class FileInputMetadataReader extends AbstractFileInputReader {
37+
38+
/**
39+
* {@inheritDoc}
40+
*/
41+
@Override
42+
protected FileInputIterator<FileRecord<TypedStruct>> newIterator(final FileContext context,
43+
final IteratorManager iteratorManager) {
44+
45+
TypedFileRecord record = new TypedFileRecord(
46+
BytesRecordOffset.empty(),
47+
TypedStruct.create("kafka.connect.filepulse.FileMetadata")
48+
.put("name", context.metadata().name())
49+
.put("path", context.metadata().path())
50+
.put("hash", context.metadata().hash())
51+
.put("lastModified", context.metadata().lastModified())
52+
.put("size", context.metadata().size())
53+
.put("inode", context.metadata().inode())
54+
);
55+
return new DelegatingFileInputIterator(context, Collections.singleton(record).iterator());
56+
}
57+
58+
private static final class DelegatingFileInputIterator implements FileInputIterator<FileRecord<TypedStruct>> {
59+
60+
private final AtomicBoolean isClosed = new AtomicBoolean(false);
61+
private final Iterator<TypedFileRecord> iterator;
62+
private final FileContext context;
63+
64+
/**
65+
* Creates a new {@link DelegatingFileInputIterator} instance.
66+
*
67+
* @param context the {@link FilterContext} object.
68+
* @param iterator the {@link Iterator} to delegate.
69+
*/
70+
DelegatingFileInputIterator(final FileContext context,
71+
final Iterator<TypedFileRecord> iterator) {
72+
this.context = context;
73+
this.iterator = iterator;
74+
}
75+
76+
@Override
77+
public FileContext context() {
78+
return context;
79+
}
80+
81+
@Override
82+
public void seekTo(SourceOffset offset) {
83+
// do nothing
84+
}
85+
86+
@Override
87+
public RecordsIterable<FileRecord<TypedStruct>> next() {
88+
return RecordsIterable.of(iterator.next());
89+
}
90+
91+
@Override
92+
public boolean hasNext() {
93+
return iterator.hasNext();
94+
}
95+
96+
@Override
97+
public void close() {
98+
isClosed.set(true);
99+
}
100+
101+
@Override
102+
public boolean isClose() {
103+
return isClosed.get();
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)