-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17664: Added check before sending background event after ShareAcknowledge #17332
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
Outdated
Show resolved
Hide resolved
...ts/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
Outdated
Show resolved
Hide resolved
...ache/kafka/clients/consumer/internals/events/ShareAcknowledgeCommitCallbackHandlerEvent.java
Outdated
Show resolved
Hide resolved
...ache/kafka/clients/consumer/internals/events/ShareAcknowledgeCommitCallbackHandlerEvent.java
Outdated
Show resolved
Hide resolved
...ts/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
@@ -588,8 +601,7 @@ private void handleShareFetchSuccess(Node fetchTarget, | |||
} | |||
acks.setAcknowledgeErrorCode(Errors.forCode(partitionData.acknowledgeErrorCode())); | |||
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks); | |||
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acksMap); | |||
backgroundEventHandler.add(event); | |||
maybeSendShareAcknowledgeCommitCallBackEvent(acksMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. "Callback" not "CallBack"
acknowledgementCommitCallbackHandler = new AcknowledgementCommitCallbackHandler(callback); | ||
} else { | ||
if (acknowledgementCommitCallbackHandler != null) { | ||
ShareAcknowledgementCommitCallbackRegistrationEvent event = new ShareAcknowledgementCommitCallbackRegistrationEvent(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be setting the flag to false
I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really you should have a test that would have caught this error too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, thanks for catching, I have added unit tests to verify this now.
|
||
public class ShareAcknowledgementCommitCallbackRegistrationEvent extends ApplicationEvent { | ||
|
||
boolean isCallbackHandlerRegistered; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just isCallbackRegistered
I think for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShivsundarR Thanks for the PR. LGTM
…cknowledge (apache#17332) What Currently, we prepare a ShareAcknowledgeCommitCallback event for every ShareAcknowledgeResponse and send it over to the application thread. In cases where the acknowledgement commit callback handler is not configured by the user, this event is not used in the application thread. So we can generate this event based on whether the callback was configured. In this PR, we have a new event which the application thread sends whenever the user enables or disables the commit callback handler, thereby letting the ShareConsumeRequestManager know if it has to send the background event or not. Test We also have a unit test verifying if the ShareConsumeRequestManager sends back the event based on the boolean configured. Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy <[email protected]>
What
Currently, we prepare a
ShareAcknowledgeCommitCallback
event for everyShareAcknowledgeResponse
and send it over to the application thread.In cases where the acknowledgement commit callback handler is not configured by the user, this event is not used in the application thread. So we can generate this event based on whether the callback was configured.
In this PR, we have a new event which the application thread sends whenever the user enables or disables the commit callback handler, thereby letting the
ShareConsumeRequestManager
know if it has to send the background event or not.Test
We also have a unit test verifying if the
ShareConsumeRequestManager
sends back the event based on the boolean configured.