Skip to content

Commit

Permalink
Added check before sending background event after ShareAcknowledge
Browse files Browse the repository at this point in the history
  • Loading branch information
ShivsundarR committed Oct 1, 2024
1 parent bb11257 commit db380c2
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private final long retryBackoffMaxMs;
private boolean closing = false;
private final CompletableFuture<Void> closeFuture;
private boolean isAcknowledgeCommitCallbackHandlerConfigured = false;

ShareConsumeRequestManager(final Time time,
final LogContext logContext,
Expand Down Expand Up @@ -270,6 +271,17 @@ private boolean isNodeFree(int nodeId) {
return !nodesWithPendingRequests.contains(nodeId);
}

public void setCallbackHandlerConfig(boolean isAcknowledgeCommitCallbackHandlerConfigured) {
this.isAcknowledgeCommitCallbackHandlerConfigured = isAcknowledgeCommitCallbackHandlerConfigured;
}

private void maybeSendShareAcknowledgeCommitCallBackEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
if (isAcknowledgeCommitCallbackHandlerConfigured) {
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acknowledgementsMap);
backgroundEventHandler.add(event);
}
}

private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknowledgeRequestState,
long currentTimeMs,
boolean onCommitAsync,
Expand Down Expand Up @@ -586,8 +598,7 @@ private void handleShareFetchSuccess(Node fetchTarget,
}
acks.setAcknowledgeErrorCode(Errors.forCode(partitionData.acknowledgeErrorCode()));
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acksMap);
backgroundEventHandler.add(event);
maybeSendShareAcknowledgeCommitCallBackEvent(acksMap);
}

ShareCompletedFetch completedFetch = new ShareCompletedFetch(
Expand Down Expand Up @@ -631,8 +642,7 @@ private void handleShareFetchFailure(Node fetchTarget,
metricsManager.recordFailedAcknowledgements(acks.size());
acks.setAcknowledgeErrorCode(Errors.forException(error));
Map<TopicIdPartition, Acknowledgements> acksMap = Collections.singletonMap(tip, acks);
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(acksMap);
backgroundEventHandler.add(event);
maybeSendShareAcknowledgeCommitCallBackEvent(acksMap);
}
}));
} finally {
Expand Down Expand Up @@ -708,8 +718,8 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
} else {
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();
}
acknowledgeRequestState.processingComplete();
}
}

Expand Down Expand Up @@ -866,7 +876,7 @@ UnsentRequest buildRequest() {
ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);
Node nodeToSend = metadata.fetch().nodeById(nodeId);

log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend);
log.info("Building acknowledgements to send : {}", finalAcknowledgementsToSend);
nodesWithPendingRequests.add(nodeId);
isProcessed = false;

Expand Down Expand Up @@ -1040,8 +1050,7 @@ public void complete(TopicIdPartition partition, Acknowledgements acknowledgemen
// For commitAsync, we do not wait for other results to complete, we prepare a background event
// for every ShareAcknowledgeResponse.
if (isCommitAsync || (remainingResults != null && remainingResults.decrementAndGet() == 0)) {
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(result);
backgroundEventHandler.add(event);
maybeSendShareAcknowledgeCommitCallBackEvent(result);
future.ifPresent(future -> future.complete(result));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeCommitCallbackHandlerEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
Expand Down Expand Up @@ -764,9 +765,13 @@ public void setAcknowledgementCommitCallback(final AcknowledgementCommitCallback
try {
if (callback != null) {
acknowledgementCommitCallbackHandler = new AcknowledgementCommitCallbackHandler(callback);
ShareAcknowledgeCommitCallbackHandlerEvent event = new ShareAcknowledgeCommitCallbackHandlerEvent(true);
applicationEventHandler.add(event);
} else {
completedAcknowledgements.clear();
acknowledgementCommitCallbackHandler = null;
ShareAcknowledgeCommitCallbackHandlerEvent event = new ShareAcknowledgeCommitCallbackHandlerEvent(true);
applicationEventHandler.add(event);
}
} finally {
release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum Type {
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
SHARE_ACKNOWLEDGE_CALLBACK_HANDLER,
SEEK_UNVALIDATED,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public void process(ApplicationEvent event) {
process((ShareAcknowledgeOnCloseEvent) event);
return;

case SHARE_ACKNOWLEDGE_CALLBACK_HANDLER:
process((ShareAcknowledgeCommitCallbackHandlerEvent) event);
return;

case SEEK_UNVALIDATED:
process((SeekUnvalidatedEvent) event);
return;
Expand Down Expand Up @@ -384,6 +388,20 @@ private void process(final ShareAcknowledgeOnCloseEvent event) {
future.whenComplete(complete(event.future()));
}

/**
* Process event indicating whether the AcknowledgeCommitCallbackHandler is configured by the user.
*
* @param event Event containing a boolean to indicate if the callback handler is configured or not.
*/
private void process(final ShareAcknowledgeCommitCallbackHandlerEvent event) {
if (!requestManagers.shareConsumeRequestManager.isPresent()) {
return;
}

ShareConsumeRequestManager manager = requestManagers.shareConsumeRequestManager.get();
manager.setCallbackHandlerConfig(event.isCallbackHandlerConfigured());
}

private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
return (value, exception) -> {
if (exception != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

public class ShareAcknowledgeCommitCallbackHandlerEvent extends ApplicationEvent {

boolean isCallbackHandlerConfigured;

public ShareAcknowledgeCommitCallbackHandlerEvent(boolean isCallbackHandlerConfigured) {
super(Type.SHARE_ACKNOWLEDGE_CALLBACK_HANDLER);
this.isCallbackHandlerConfigured = isCallbackHandlerConfigured;
}

public boolean isCallbackHandlerConfigured() {
return isCallbackHandlerConfigured;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,54 @@ public void testRetryAcknowledgements() throws InterruptedException {
assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
}

@Test
public void testCallbackHandlerConfig() throws InterruptedException {
buildRequestManager();

assignFromSubscribed(Collections.singleton(tp0));

// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());

client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);

shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));

assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());

client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));

completedAcknowledgements.clear();

// Setting the boolean to false, indicating there is no callback handler configured.
shareConsumeRequestManager.setCallbackHandlerConfig(false);

Acknowledgements acknowledgements2 = Acknowledgements.empty();
acknowledgements2.add(3L, AcknowledgeType.ACCEPT);

shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements2));

TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()));

client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

// We expect no acknowledgements to be added as the callback handler is not configured.
assertEquals(0, completedAcknowledgements.size());
}

@Test
public void testMultipleTopicsFetch() {
buildRequestManager();
Expand Down Expand Up @@ -1066,6 +1114,7 @@ private <K, V> void buildRequestManager(MetricConfig metricConfig,
backgroundEventHandler,
metricsManager,
shareFetchCollector));
shareConsumeRequestManager.setCallbackHandlerConfig(true);
}

private void buildDependencies(MetricConfig metricConfig,
Expand Down

0 comments on commit db380c2

Please sign in to comment.