Skip to content

Commit 1b2dfc8

Browse files
committed
chore: refactor some code quality
1 parent f49e75c commit 1b2dfc8

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,14 @@ public List<SourceRecord> poll() throws InterruptedException {
245245
if (!isTaskRunning()) continue;
246246
return results;
247247
}
248-
} catch (final Throwable t) {
249-
// This task has failed, so close any resources (maybe reopened if needed) before throwing
248+
} catch (final Exception e) {
249+
if (e instanceof InterruptedException) {
250+
Thread.currentThread().interrupt();
251+
}
252+
// This task has failed, so close any resources before throwing
250253
LOG.error("This task has failed due to uncaught error and will be stopped.");
251254
closeResources();
252-
throw t;
255+
throw e;
253256
}
254257
// Only in case of shutdown
255258
closeResources();

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileSystemMonitorThread.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ void shutdown(final long timeoutMs) {
108108
LOG.info("Shutting down thread monitoring filesystem.");
109109
this.shutdownLatch.countDown();
110110
try {
111-
this.waitingLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
111+
if (waitingLatch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
112+
LOG.debug("Timeout reached before completing thread shutdown");
113+
}
112114
} catch (InterruptedException ignore) {
113115
LOG.error("Timeout : scan loop is not terminated yet.");
114116
Thread.currentThread().interrupt();

0 commit comments

Comments
 (0)