Skip to content

SyncWriter.flush() now throw Exception when previous task was failed #484

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

ekuvardin
Copy link
Contributor

@ekuvardin ekuvardin commented May 13, 2025

Problem connected to this one #475

What's the cause?

A problem when encoding/decoding (or something else) throws an error.

The user writes

WriterSettings settings = WriterSettings.newBuilder()
                .setTopicPath(topicName)
                .setCodec(10000) // or .setCodec(Codec.CUSTOM) for new version
                .build();
        SyncWriter writer = client.createSyncWriter(settings);
         writer.initAndWait();


        for (byte[] testMessage : testMessages) {
            writer.send(Message.newBuilder().setData(testMessage).build());
        }

       writer.flush();
       writer.shutdown(1, TimeUnit.MINUTES);

Code in writer.send failed because codec CUSTOM did not exist.
But the problem is when it's failed before or writer.flush();

When we call writer.flush(), we finally end up in flushImpl.
Take a look at in WriterImpl

this.lastAcceptedMessageFuture.isDone()
                    ? CompletableFuture.completedFuture(null)
                    : this.lastAcceptedMessageFuture.thenApply(v -> null);

If the thread was too quick, then lastAcceptedMessageFuture - DONE(with error inside future), we return CompletableFuture.completedFuture(null) and writer.flush() doesn't fail.

If the thread is slow, then lastAcceptedMessageFuture is NOT COMPLETED, and we call lastAcceptedMessageFuture.thenApply(v -> null) and failed with RuntimeException wraped by CompletionException

Behavior depends on the speed of the test/machine and so on.

Solution
I think that we should throw an error in writer.flush()
Return his. lastAcceptedMessageFuture.thenApply(v -> null); despite whether lastAcceptedMessageFuture is done or not

Test

A bit confused where to write the test.
Test depends on #447 (Codec class)

Also, in MR below, we can uncomment Ignore test in YdbTopicsCodecIntegrationTest they will pass.
Maybe wait for MR to push?
On the other hand I can write simple one test -> just push Thread.sleep(5000) before writer.flush();

@alex268 alex268 requested review from pnv1 and alex268 May 15, 2025 07:49
@pnv1
Copy link
Collaborator

pnv1 commented May 15, 2025

PR 447 is pushed, but in next release branch, not in master. Test that depends on new codecs should probably also be added to new release branch

@alex268 alex268 merged commit f5b5df8 into ydb-platform:master May 15, 2025
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants