Skip to content

Commit 1ba75b1

Browse files
committed
fix(plugin): fix MoveCleanupPolicy should not fail if file iis already moved (#68)
Resolves: GH-68
1 parent ff3ecb8 commit 1ba75b1

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/clean/MoveCleanupPolicy.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public void configure(final Map<String, ?> configs) {
5959
@Override
6060
public boolean onSuccess(final SourceFile source) {
6161
final File file = source.file();
62+
if (!file.exists()) {
63+
LOG.warn("Cannot move file '{}' to success path due to file does not exist.", file);
64+
return true;
65+
}
6266
return doCleanup(file, buildTargetPath(configs.scanDirectoryPath(), file, configs.outputSucceedPath()));
6367
}
6468

@@ -68,6 +72,10 @@ public boolean onSuccess(final SourceFile source) {
6872
@Override
6973
public boolean onFailure(final SourceFile source) {
7074
final File file = source.file();
75+
if (!file.exists()) {
76+
LOG.warn("Cannot move file '{}' to error path due to file does not exist.", file);
77+
return true;
78+
}
7179
return doCleanup(file, buildTargetPath(configs.scanDirectoryPath(), file, configs.outputFailedPath()));
7280
}
7381

@@ -105,8 +113,9 @@ private boolean doCleanup(final File source, final Path target) {
105113
LOG.debug(
106114
"Non-atomic move of {} to {} succeeded after atomic move failed due to {}",
107115
source,
108-
target,
109-
outer.getMessage());
116+
target,
117+
outer.getMessage()
118+
);
110119
} catch (IOException inner) {
111120
inner.addSuppressed(outer);
112121
LOG.error("Error while moving file {}", source, inner);

0 commit comments

Comments
 (0)