Skip to content

Commit f5b5df8

Browse files
authored
Merge pull request #484 from ekuvardin/475.SyncWriter-and-flush-problem-throwing-Exception
SyncWriter.flush() now throw Exception when previous task was failed
2 parents 7e5d461 + b0cd328 commit f5b5df8

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,16 +269,19 @@ protected CompletableFuture<CompletableFuture<WriteAck>> sendImpl(Message messag
269269
return tryToEnqueue(enqueuedMessage, instant).thenApply(v -> enqueuedMessage.getFuture());
270270
}
271271

272+
/**
273+
* Create a wrapper upon the future for the flush method.
274+
*
275+
* @return an empty Future if successful. Throw CompletionException when an error occurs.
276+
*/
272277
protected CompletableFuture<Void> flushImpl() {
273278
if (this.lastAcceptedMessageFuture == null) {
274279
return CompletableFuture.completedFuture(null);
275280
}
276281
incomingQueueLock.lock();
277282

278283
try {
279-
return this.lastAcceptedMessageFuture.isDone()
280-
? CompletableFuture.completedFuture(null)
281-
: this.lastAcceptedMessageFuture.thenApply(v -> null);
284+
return this.lastAcceptedMessageFuture.thenApply(v -> null);
282285
} finally {
283286
incomingQueueLock.unlock();
284287
}

0 commit comments

Comments
 (0)