Skip to content

Commit

Permalink
KAFKA-16927; Handle expanding leader endpoints (apache#17363)
Browse files Browse the repository at this point in the history
When a replica restarts in the follower state it is possible for the set of leader endpoints to not match the latest set of leader endpoints. Voters will discover the latest set of leader endpoints through the BEGIN_QUORUM_EPOCH request. This means that KRaft needs to allow for the replica to transition from Follower to Follower when only the set of leader endpoints has changed.

Reviewers: Colin P. McCabe <[email protected]>, Alyssa Huang <[email protected]>
  • Loading branch information
jsancio authored and tedyu committed Jan 6, 2025
1 parent 5fa460a commit 497fd76
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 9 deletions.
54 changes: 46 additions & 8 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,15 +470,53 @@ public void transitionToUnattachedVotedState(
*/
public void transitionToFollower(int epoch, int leaderId, Endpoints endpoints) {
int currentEpoch = state.epoch();
if (localId.isPresent() && leaderId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Follower with leader " + leaderId +
" and epoch " + epoch + " since it matches the local broker.id " + localId);
if (endpoints.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Cannot transition to Follower with leader %s and epoch %s without a leader endpoint",
leaderId,
epoch
)
);
} else if (localId.isPresent() && leaderId == localId.getAsInt()) {
throw new IllegalStateException(
String.format(
"Cannot transition to Follower with leader %s and epoch %s since it matches the local node.id %s",
leaderId,
epoch,
localId
)
);
} else if (epoch < currentEpoch) {
throw new IllegalStateException("Cannot transition to Follower with leader " + leaderId +
" and epoch " + epoch + " since the current epoch " + currentEpoch + " is larger");
} else if (epoch == currentEpoch && (isFollower() || isLeader())) {
throw new IllegalStateException("Cannot transition to Follower with leader " + leaderId +
" and epoch " + epoch + " from state " + state);
throw new IllegalStateException(
String.format(
"Cannot transition to Follower with leader %s and epoch %s since the current epoch %s is larger",
leaderId,
epoch,
currentEpoch
)
);
} else if (epoch == currentEpoch) {
if (isFollower() && state.leaderEndpoints().size() >= endpoints.size()) {
throw new IllegalStateException(
String.format(
"Cannot transition to Follower with leader %s, epoch %s and endpoints %s from state %s",
leaderId,
epoch,
endpoints,
state
)
);
} else if (isLeader()) {
throw new IllegalStateException(
String.format(
"Cannot transition to Follower with leader %s and epoch %s from state %s",
leaderId,
epoch,
state
)
);
}
}

durableTransitionTo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2257,6 +2257,45 @@ void testObserverDiscoversLeaderWithUnknownVoters() throws Exception {
assertEquals(-2, fetchRequest.destination().id());
}

@Test
public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
ReplicaKey leader = replicaKey(local.id() + 1, true);
int leaderEpoch = 3;

VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));

RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
.withBootstrapSnapshot(Optional.of(voters))
.withElectedLeader(leaderEpoch, leader.id())
.withKip853Rpc(true)
.build();

context.client.poll();

HashMap<ListenerName, InetSocketAddress> leaderListenersMap = new HashMap<>(2);
leaderListenersMap.put(
VoterSetTest.DEFAULT_LISTENER_NAME,
InetSocketAddress.createUnresolved("localhost", 9990 + leader.id())
);
leaderListenersMap.put(
ListenerName.normalised("ANOTHER_LISTENER"),
InetSocketAddress.createUnresolved("localhost", 8990 + leader.id())
);
Endpoints leaderEndpoints = Endpoints.fromInetSocketAddresses(leaderListenersMap);

context.deliverRequest(context.beginEpochRequest(leaderEpoch, leader.id(), leaderEndpoints));
context.pollUntilResponse();

context.assertElectedLeader(leaderEpoch, leader.id());

context.assertSentBeginQuorumEpochResponse(
Errors.NONE,
leaderEpoch,
OptionalInt.of(leader.id())
);
}

private static void verifyVotersRecord(
VoterSet expectedVoterSet,
ByteBuffer recordKey,
Expand Down
41 changes: 41 additions & 0 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
Expand All @@ -55,6 +56,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -960,6 +962,45 @@ public void testHandleBeginQuorumRequest(boolean withKip853Rpc) throws Exception
);
}

@Test
public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
ReplicaKey leader = replicaKey(local.id() + 1, true);
int leaderEpoch = 3;

VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));

RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
.withStaticVoters(voters)
.withElectedLeader(leaderEpoch, leader.id())
.withKip853Rpc(true)
.build();

context.client.poll();

HashMap<ListenerName, InetSocketAddress> leaderListenersMap = new HashMap<>(2);
leaderListenersMap.put(
VoterSetTest.DEFAULT_LISTENER_NAME,
InetSocketAddress.createUnresolved("localhost", 9990 + leader.id())
);
leaderListenersMap.put(
ListenerName.normalised("ANOTHER_LISTENER"),
InetSocketAddress.createUnresolved("localhost", 8990 + leader.id())
);
Endpoints leaderEndpoints = Endpoints.fromInetSocketAddresses(leaderListenersMap);

context.deliverRequest(context.beginEpochRequest(leaderEpoch, leader.id(), leaderEndpoints));
context.pollUntilResponse();

context.assertElectedLeader(leaderEpoch, leader.id());

context.assertSentBeginQuorumEpochResponse(
Errors.NONE,
leaderEpoch,
OptionalInt.of(leader.id())
);
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleBeginQuorumResponse(boolean withKip853Rpc) throws Exception {
Expand Down
34 changes: 34 additions & 0 deletions raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.raft;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
Expand All @@ -32,6 +33,7 @@
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -1211,6 +1213,38 @@ public void testFollowerToFollowerSameEpoch(KRaftVersion kraftVersion) {
);
}

@ParameterizedTest
@EnumSource(value = KRaftVersion.class)
public void testFollowerToFollowerSameEpochAndMoreEndpoints(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion);
QuorumState state = initializeEmptyState(voters, kraftVersion);
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
state.transitionToFollower(
8,
node2,
voters.listeners(node2)
);

HashMap<ListenerName, InetSocketAddress> newNode2ListenersMap = new HashMap<>(2);
newNode2ListenersMap.put(
VoterSetTest.DEFAULT_LISTENER_NAME,
InetSocketAddress.createUnresolved("localhost", 9990 + node2)
);
newNode2ListenersMap.put(
ListenerName.normalised("ANOTHER_LISTENER"),
InetSocketAddress.createUnresolved("localhost", 8990 + node2)
);
Endpoints newNode2Endpoints = Endpoints.fromInetSocketAddresses(newNode2ListenersMap);

state.transitionToFollower(
8,
node2,
newNode2Endpoints
);
}

@ParameterizedTest
@EnumSource(value = KRaftVersion.class)
public void testFollowerToFollowerHigherEpoch(KRaftVersion kraftVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,14 @@ BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId) {
return beginEpochRequest(clusterId, epoch, leaderId);
}

BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId, Endpoints endpoints) {
ReplicaKey localReplicaKey = kip853Rpc ?
ReplicaKey.of(localIdOrThrow(), localDirectoryId) :
ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID);

return beginEpochRequest(clusterId, epoch, leaderId, endpoints, localReplicaKey);
}

BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch, int leaderId) {
ReplicaKey localReplicaKey = kip853Rpc ?
ReplicaKey.of(localIdOrThrow(), localDirectoryId) :
Expand All @@ -1381,13 +1389,29 @@ BeginQuorumEpochRequestData beginEpochRequest(
int epoch,
int leaderId,
ReplicaKey voterKey
) {
return beginEpochRequest(
clusterId,
epoch,
leaderId,
startingVoters.listeners(leaderId),
voterKey
);
}

BeginQuorumEpochRequestData beginEpochRequest(
String clusterId,
int epoch,
int leaderId,
Endpoints endpoints,
ReplicaKey voterKey
) {
return RaftUtil.singletonBeginQuorumEpochRequest(
metadataPartition,
clusterId,
epoch,
leaderId,
startingVoters.listeners(leaderId),
endpoints,
voterKey
);
}
Expand Down

0 comments on commit 497fd76

Please sign in to comment.