Skip to content

Commit 6b187e9

Browse files
authored
KAFKA-16926: Optimize BeginQuorumEpoch heartbeat (#20318)
Instead of sending out BeginQuorum requests to every voter on a cadence, we can save on some requests by only sending to those which have not fetched within the BeginQuorumEpoch timeout. Reviewers: José Armando García Sancio <[email protected]>, Chia-Ping Tsai <[email protected]>, Kuan-Po Tseng <[email protected]>, Alyssa Huang <[email protected]>
1 parent 7a8dba3 commit 6b187e9

File tree

3 files changed

+78
-10
lines changed

3 files changed

+78
-10
lines changed

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,6 @@ private void updateLeaderEndOffsetAndTimestamp(
358358
long currentTimeMs
359359
) {
360360
final LogOffsetMetadata endOffsetMetadata = log.endOffset();
361-
362361
if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet())) {
363362
onUpdateLeaderHighWatermark(state, currentTimeMs);
364363
}
@@ -1512,6 +1511,7 @@ private CompletableFuture<FetchResponseData> handleFetchRequest(
15121511
FetchRequest.replicaId(request),
15131512
fetchPartition.replicaDirectoryId()
15141513
);
1514+
15151515
FetchResponseData response = tryCompleteFetchRequest(
15161516
requestMetadata.listenerName(),
15171517
requestMetadata.apiVersion(),
@@ -2933,7 +2933,7 @@ private long maybeSendRequests(
29332933
return minBackoffMs;
29342934
}
29352935

2936-
private long maybeSendRequest(
2936+
private long maybeSendRequests(
29372937
long currentTimeMs,
29382938
Set<ReplicaKey> remoteVoters,
29392939
Function<Integer, Node> destinationSupplier,
@@ -3120,13 +3120,10 @@ private long maybeSendBeginQuorumEpochRequests(
31203120
)
31213121
);
31223122

3123-
timeUntilNextBeginQuorumSend = maybeSendRequest(
3123+
Set<ReplicaKey> needToSendBeginQuorumRequests = state.needToSendBeginQuorumRequests(currentTimeMs);
3124+
timeUntilNextBeginQuorumSend = maybeSendRequests(
31243125
currentTimeMs,
3125-
voters
3126-
.voterKeys()
3127-
.stream()
3128-
.filter(key -> key.id() != quorum.localIdOrThrow())
3129-
.collect(Collectors.toSet()),
3126+
needToSendBeginQuorumRequests,
31303127
nodeSupplier,
31313128
this::buildBeginQuorumEpochRequest
31323129
);
@@ -3208,7 +3205,7 @@ private long maybeSendVoteRequests(
32083205
if (!state.epochElection().isVoteRejected()) {
32093206
VoterSet voters = partitionState.lastVoterSet();
32103207
boolean preVote = quorum.isProspective();
3211-
return maybeSendRequest(
3208+
return maybeSendRequests(
32123209
currentTimeMs,
32133210
state.epochElection().unrecordedVoters(),
32143211
voterId -> voters

raft/src/main/java/org/apache/kafka/raft/LeaderState.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,29 @@ public void resetBeginQuorumEpochTimer(long currentTimeMs) {
188188
beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
189189
}
190190

191+
/**
192+
* Determines the set of replicas that should receive a {@code BeginQuorumEpoch} request
193+
* based on the elapsed time since their last fetch.
194+
* <p>
195+
* For each remote voter (excluding the local node), if the time since the last
196+
* fetch exceeds the configured {@code beginQuorumEpochTimeoutMs}, the replica
197+
* is considered to need a new quorum epoch request.
198+
*
199+
* @param currentTimeMs the current system time in milliseconds
200+
* @return an unmodifiable set of {@link ReplicaKey} objects representing replicas
201+
* that need to receive a {@code BeginQuorumEpoch} request
202+
*/
203+
public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
204+
return voterStates.values()
205+
.stream()
206+
.filter(
207+
state -> state.replicaKey.id() != localVoterNode.voterKey().id() &&
208+
currentTimeMs - state.lastFetchTimestamp >= beginQuorumEpochTimeoutMs
209+
)
210+
.map(ReplicaState::replicaKey)
211+
.collect(Collectors.toUnmodifiableSet());
212+
}
213+
191214
/**
192215
* Get the remaining time in milliseconds until the checkQuorumTimer expires.
193216
*

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.kafka.common.utils.Utils;
4040
import org.apache.kafka.raft.errors.BufferAllocationException;
4141
import org.apache.kafka.raft.errors.NotLeaderException;
42+
import org.apache.kafka.server.common.KRaftVersion;
4243
import org.apache.kafka.test.TestUtils;
4344

4445
import org.junit.jupiter.api.Test;
@@ -618,7 +619,7 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio
618619
int epoch = context.currentEpoch();
619620
assertEquals(OptionalInt.of(localId), context.currentLeader());
620621

621-
// begin epoch requests should be sent out every beginQuorumEpochTimeoutMs
622+
// begin epoch requests sent out every beginQuorumEpochTimeoutMs if replicas have not fetched
622623
context.time.sleep(context.beginQuorumEpochTimeoutMs);
623624
context.client.poll();
624625
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));
@@ -633,6 +634,53 @@ public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws Exceptio
633634
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));
634635
}
635636

637+
@ParameterizedTest
638+
@ValueSource(booleans = { true, false })
639+
public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean withKip853Rpc) throws Exception {
640+
ReplicaKey localKey = replicaKey(randomReplicaId(), true);
641+
int remoteId1 = localKey.id() + 1;
642+
int remoteId2 = localKey.id() + 2;
643+
ReplicaKey replicaKey1 = replicaKey(remoteId1, withKip853Rpc);
644+
ReplicaKey replicaKey2 = replicaKey(remoteId2, withKip853Rpc);
645+
646+
RaftClientTestContext context = new RaftClientTestContext.Builder(localKey.id(), localKey.directoryId().get())
647+
.withKip853Rpc(withKip853Rpc)
648+
.withStartingVoters(
649+
VoterSetTest.voterSet(Stream.of(localKey, replicaKey1, replicaKey2)),
650+
KRaftVersion.KRAFT_VERSION_1
651+
)
652+
.build();
653+
654+
context.unattachedToLeader();
655+
int epoch = context.currentEpoch();
656+
assertEquals(OptionalInt.of(localKey.id()), context.currentLeader());
657+
658+
// begin epoch requests sent out every beginQuorumEpochTimeoutMs if replicas have not fetched
659+
context.time.sleep(context.beginQuorumEpochTimeoutMs);
660+
context.client.poll();
661+
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));
662+
663+
long partialDelay = context.beginQuorumEpochTimeoutMs / 3;
664+
context.time.sleep(context.beginQuorumEpochTimeoutMs / 3);
665+
context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 0));
666+
context.pollUntilResponse();
667+
668+
context.time.sleep(context.beginQuorumEpochTimeoutMs - partialDelay);
669+
context.client.poll();
670+
// leader will not send BeginQuorumEpochRequest again for replica 1 since fetchRequest was received
671+
// before beginQuorumEpochTimeoutMs time has elapsed
672+
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2));
673+
674+
context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 0));
675+
context.pollUntilResponse();
676+
context.time.sleep(context.beginQuorumEpochTimeoutMs);
677+
context.client.poll();
678+
// leader should send BeginQuorumEpochRequest to a node if beginQuorumEpochTimeoutMs time has elapsed
679+
// without receiving a fetch request from that node
680+
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, remoteId2));
681+
}
682+
683+
636684
@ParameterizedTest
637685
@ValueSource(booleans = { true, false })
638686
public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean withKip853Rpc) throws Exception {

0 commit comments

Comments
 (0)