diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 965f8074f8014..e5b80651b817d 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -441,6 +441,11 @@ public void cancel(String key) { if (prevTask != null) prevTask.cancel(); } + @Override + public boolean isScheduled(String key) { + return tasks.containsKey(key); + } + public void cancelAll() { Iterator> iterator = tasks.entrySet().iterator(); while (iterator.hasNext()) { diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java index d10e38a7d828b..2f288df3026d0 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java @@ -84,4 +84,12 @@ interface TimeoutOperation { * @param key The key. */ void cancel(String key); + + /** + * Check if an operation with the given key is scheduled. + * + * @param key The key. + * @return True if an operation with the key is scheduled, false otherwise. + */ + boolean isScheduled(String key); } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index dfbbdf048bc20..c9951ed579ddf 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -2351,6 +2351,57 @@ public void testTimerScheduleIfAbsent() throws InterruptedException { assertEquals(0, ctx.timer.size()); } + @Test + public void testTimerIsScheduled() throws InterruptedException { + MockTimer timer = new MockTimer(); + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(new MockPartitionWriter()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + runtime.scheduleLoadOperation(TP, 10); + + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0, ctx.timer.size()); + + assertFalse(ctx.timer.isScheduled("timer-1")); + + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, + () -> new CoordinatorResult<>(List.of("record1"), null)); + + assertTrue(ctx.timer.isScheduled("timer-1")); + assertFalse(ctx.timer.isScheduled("timer-2")); + assertEquals(1, ctx.timer.size()); + + ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, false, + () -> new CoordinatorResult<>(List.of("record2"), null)); + + assertTrue(ctx.timer.isScheduled("timer-1")); + assertTrue(ctx.timer.isScheduled("timer-2")); + assertEquals(2, ctx.timer.size()); + + ctx.timer.cancel("timer-1"); + + assertFalse(ctx.timer.isScheduled("timer-1")); + assertTrue(ctx.timer.isScheduled("timer-2")); + assertEquals(1, ctx.timer.size()); + + timer.advanceClock(21); + + assertFalse(ctx.timer.isScheduled("timer-2")); + assertEquals(0, ctx.timer.size()); + } + @Test public void testStateChanges() throws Exception { MockTimer timer = new MockTimer(); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java index 78e14ac576b39..69e3954a0a63a 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java @@ -110,6 +110,14 @@ public void cancel(String key) { } } + /** + * Checks if a timeout with the given key is scheduled. + */ + @Override + public boolean isScheduled(String key) { + return timeoutMap.containsKey(key); + } + /** * @return True if a timeout with the key exists; false otherwise. */ diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3d801536072cc..ca4effd22564d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -75,7 +75,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils} -import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG} +import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult @@ -363,6 +363,7 @@ class KafkaApisTest extends Logging { cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString) cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString) cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString) + cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString) when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 565d492507be6..bdb0cbb9f1ce8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -75,6 +75,8 @@ public final class GroupConfig extends AbstractConfig { public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "streams.num.standby.replicas"; + public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = "streams.initial.rebalance.delay.ms"; + public final int consumerSessionTimeoutMs; public final int consumerHeartbeatIntervalMs; @@ -93,6 +95,8 @@ public final class GroupConfig extends AbstractConfig { public final int streamsNumStandbyReplicas; + public final int streamsInitialRebalanceDelayMs; + public final String shareIsolationLevel; private static final ConfigDef CONFIG = new ConfigDef() @@ -155,7 +159,13 @@ public final class GroupConfig extends AbstractConfig { GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, - GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC); + GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC) + .define(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, + INT, + GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, + atLeast(0), + MEDIUM, + GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC); public GroupConfig(Map props) { super(CONFIG, props, false); @@ -168,6 +178,7 @@ public GroupConfig(Map props) { this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG); this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG); this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG); + this.streamsInitialRebalanceDelayMs = getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG); this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG); } @@ -379,6 +390,13 @@ public int streamsNumStandbyReplicas() { return streamsNumStandbyReplicas; } + /** + * The initial rebalance delay for streams groups. + */ + public int streamsInitialRebalanceDelayMs() { + return streamsInitialRebalanceDelayMs; + } + /** * The share group isolation level. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index e8a2f49663955..a2a99707dc710 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -290,6 +290,10 @@ public class GroupCoordinatorConfig { public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2; public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG; + public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.streams.initial.rebalance.delay.ms"; + public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; + public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more streams clients to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances."; + public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG = "group.share.initialize.retry.interval.ms"; // Because persister retries with exp backoff 5 times and upper cap of 30 secs. public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT = 30_000; @@ -352,7 +356,8 @@ public class GroupCoordinatorConfig { .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC) .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC) - .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC); + .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC) + .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC); /** @@ -405,6 +410,7 @@ public class GroupCoordinatorConfig { private final int streamsGroupMaxSize; private final int streamsGroupNumStandbyReplicas; private final int streamsGroupMaxStandbyReplicas; + private final int streamsGroupInitialRebalanceDelayMs; @SuppressWarnings("this-escape") public GroupCoordinatorConfig(AbstractConfig config) { @@ -457,6 +463,7 @@ public GroupCoordinatorConfig(AbstractConfig config) { this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG); this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG); this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG); + this.streamsGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); // New group coordinator configs validation. require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, @@ -961,4 +968,11 @@ public int streamsGroupNumStandbyReplicas() { public int streamsGroupMaxNumStandbyReplicas() { return streamsGroupMaxStandbyReplicas; } + + /** + * The initial rebalance delay for streams groups. + */ + public int streamsGroupInitialRebalanceDelayMs() { + return streamsGroupInitialRebalanceDelayMs; + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 66f755148e9de..493a5a97b0b4a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1985,30 +1985,57 @@ private CoordinatorResult stream // Actually bump the group epoch int groupEpoch = group.groupEpoch(); + int initialGroupEpoch = groupEpoch; if (bumpGroupEpoch) { - groupEpoch += 1; + if (groupEpoch == 0) { + groupEpoch = 2; + } else { + groupEpoch += 1; + } records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch, currentAssignmentConfigs)); log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch); metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME); group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); } + // Schedule initial rebalance delay for new streams groups to coalesce joins. + boolean isInitialRebalance = (initialGroupEpoch == 0); + if (isInitialRebalance) { + int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId); + if (initialDelayMs > 0) { + timer.scheduleIfAbsent( + streamsInitialRebalanceKey(groupId), + initialDelayMs, + TimeUnit.MILLISECONDS, + false, + () -> computeDelayedTargetAssignment(groupId) + ); + } + } + // 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member // replaces an existing static member. // The delta between the existing and the new target assignment is persisted to the partition. int targetAssignmentEpoch; TasksTuple targetAssignment; if (groupEpoch > group.assignmentEpoch()) { - targetAssignment = updateStreamsTargetAssignment( - group, - groupEpoch, - updatedMember, - updatedConfiguredTopology, - metadataImage, - records, - currentAssignmentConfigs - ); - targetAssignmentEpoch = groupEpoch; + boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId)); + if (initialDelayActive && group.assignmentEpoch() == 0) { + // During initial rebalance delay, return empty assignment to first joining members. + targetAssignmentEpoch = 1; + targetAssignment = TasksTuple.EMPTY; + } else { + targetAssignment = updateStreamsTargetAssignment( + group, + groupEpoch, + Optional.of(updatedMember), + updatedConfiguredTopology, + metadataImage, + records, + currentAssignmentConfigs + ); + targetAssignmentEpoch = groupEpoch; + } } else { targetAssignmentEpoch = group.assignmentEpoch(); targetAssignment = group.targetAssignment(updatedMember.memberId()); @@ -3949,15 +3976,15 @@ private Assignment updateTargetAssignment( * * @param group The StreamsGroup. * @param groupEpoch The group epoch. - * @param updatedMember The updated member. + * @param updatedMember The updated member (optional). * @param metadataImage The metadata image. * @param records The list to accumulate any new records. - * @return The new target assignment. + * @return The new target assignment for the updated member, or EMPTY if no member specified. */ private TasksTuple updateStreamsTargetAssignment( StreamsGroup group, int groupEpoch, - StreamsGroupMember updatedMember, + Optional updatedMember, ConfiguredTopology configuredTopology, CoordinatorMetadataImage metadataImage, List records, @@ -3976,8 +4003,11 @@ private TasksTuple updateStreamsTargetAssignment( .withTopology(configuredTopology) .withStaticMembers(group.staticMembers()) .withMetadataImage(metadataImage) - .withTargetAssignment(group.targetAssignment()) - .addOrUpdateMember(updatedMember.memberId(), updatedMember); + .withTargetAssignment(group.targetAssignment()); + + if (updatedMember.isPresent()) { + assignmentResultBuilder.addOrUpdateMember(updatedMember.get().memberId(), updatedMember.get()); + } long startTimeMs = time.milliseconds(); org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = @@ -3994,7 +4024,7 @@ private TasksTuple updateStreamsTargetAssignment( records.addAll(assignmentResult.records()); - return assignmentResult.targetAssignment().get(updatedMember.memberId()); + return updatedMember.isPresent() ? assignmentResult.targetAssignment().get(updatedMember.get().memberId()) : TasksTuple.EMPTY; } catch (TaskAssignorException ex) { String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", groupEpoch, ex.getMessage()); @@ -4003,6 +4033,51 @@ private TasksTuple updateStreamsTargetAssignment( } } + /** + * Fires the initial rebalance for a streams group when the delay timer expires. + * Computes and persists target assignment for all members if conditions are met. + * + * @param groupId The group id. + * @return A CoordinatorResult with records to persist the target assignment, or EMPTY_RESULT. + */ + private CoordinatorResult computeDelayedTargetAssignment( + String groupId + ) { + try { + StreamsGroup group = streamsGroup(groupId); + + if (group.isEmpty()) { + log.info("[GroupId {}] Skipping delayed target assignment: group is empty", groupId); + return EMPTY_RESULT; + } + + if (group.groupEpoch() <= group.assignmentEpoch()) { + throw new IllegalStateException("Group epoch should be always larger to assignment epoch"); + } + + if (!group.configuredTopology().isPresent()) { + log.warn("[GroupId {}] Cannot compute delayed target assignment: configured topology is not present", groupId); + return EMPTY_RESULT; + } + + List records = new ArrayList<>(); + updateStreamsTargetAssignment( + group, + group.groupEpoch(), + Optional.empty(), + group.configuredTopology().get(), + metadataImage, + records, + group.lastAssignmentConfigs() + ); + + return new CoordinatorResult<>(records, null); + } catch (GroupIdNotFoundException ex) { + log.warn("[GroupId {}] Group not found during initial rebalance.", groupId); + return EMPTY_RESULT; + } + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. @@ -8570,6 +8645,8 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List groupConfig = groupConfigManager.groupConfig(groupId); + return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs) + .orElse(config.streamsGroupInitialRebalanceDelayMs()); + } + /** * Get the assignor of the provided streams group. */ @@ -8716,6 +8802,19 @@ static String classicGroupSyncKey(String groupId) { return "sync-" + groupId; } + /** + * Generate a streams group initial rebalance key for the timer. + * + * Package private for testing. + * + * @param groupId The group id. + * + * @return the initial rebalance key. + */ + static String streamsInitialRebalanceKey(String groupId) { + return "initial-rebalance-timeout-" + groupId; + } + /** * Generate a consumer group join key for the timer. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 77014de5bf18a..a68379268182b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -68,6 +68,8 @@ public void testFromPropsInvalid() { assertPropertyInvalid(name, "not_a_number", "1.0"); } else if (GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) { assertPropertyInvalid(name, "not_a_number", "1.0"); + } else if (GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG.equals(name)) { + assertPropertyInvalid(name, "not_a_number", "-1", "1.0"); } else { assertPropertyInvalid(name, "not_a_number", "-0.1"); } @@ -237,6 +239,7 @@ public void testFromPropsWithDefaultValue() { defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "10"); defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "2000"); defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1"); + defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000"); Properties props = new Properties(); props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20"); @@ -252,6 +255,7 @@ public void testFromPropsWithDefaultValue() { assertEquals(10, config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG)); assertEquals(2000, config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG)); assertEquals(1, config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG)); + assertEquals(3000, config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG)); } @Test @@ -274,6 +278,7 @@ private Properties createValidGroupConfig() { props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000"); props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000"); props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1"); + props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000"); return props; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 491df993e0999..c447aec537495 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -198,6 +198,7 @@ public void testConfigs() { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, 15 * 60 * 1000); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 5000); GroupCoordinatorConfig config = createConfig(configs); @@ -227,6 +228,7 @@ public void testConfigs() { assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs()); assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs()); assertEquals(15 * 60 * 1000, config.consumerGroupRegexRefreshIntervalMs()); + assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs()); } @Test @@ -323,6 +325,11 @@ public void testInvalidConfigs() { configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); assertEquals("group.streams.heartbeat.interval.ms must be less than group.streams.session.timeout.ms", assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, -1); + assertEquals("Invalid value -1 for configuration group.streams.initial.rebalance.delay.ms: Value must be at least 0", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( @@ -359,6 +366,23 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( return createConfig(configs); } + @Test + public void testStreamsGroupInitialRebalanceDelayDefaultValue() { + Map configs = new HashMap<>(); + GroupCoordinatorConfig config = createConfig(configs); + assertEquals(3000, config.streamsGroupInitialRebalanceDelayMs()); + assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, + config.streamsGroupInitialRebalanceDelayMs()); + } + + @Test + public void testStreamsGroupInitialRebalanceDelayCustomValue() { + Map configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 7000); + GroupCoordinatorConfig config = createConfig(configs); + assertEquals(7000, config.streamsGroupInitialRebalanceDelayMs()); + } + public static GroupCoordinatorConfig createConfig(Map configs) { return new GroupCoordinatorConfig(new AbstractConfig( GroupCoordinatorConfig.CONFIG_DEF, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 02fc987e10182..13f7b9ed71083 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -16233,6 +16233,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -16258,7 +16259,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -16275,7 +16276,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16288,13 +16289,13 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")), + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) )), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -16320,6 +16321,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -16344,7 +16346,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -16358,7 +16360,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16373,7 +16375,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { List actualDescribedGroups = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset); StreamsGroupDescribeResponseData.DescribedGroup expectedDescribedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) - .setAssignmentEpoch(1) + .setAssignmentEpoch(2) .setTopology( new StreamsGroupDescribeResponseData.Topology() .setEpoch(0) @@ -16388,7 +16390,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) )) .setGroupState(StreamsGroupState.STABLE.toString()) - .setGroupEpoch(1); + .setGroupEpoch(2); assertEquals(1, actualDescribedGroups.size()); assertEquals(expectedDescribedGroup, actualDescribedGroups.get(0)); } @@ -16415,6 +16417,7 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member joins the streams group. @@ -16437,7 +16440,7 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -16451,7 +16454,7 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16461,11 +16464,11 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) )), -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -16496,6 +16499,7 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member joins the streams group. @@ -16523,7 +16527,7 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -16537,7 +16541,7 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16547,11 +16551,11 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) )), -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -16584,6 +16588,7 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member joins the streams group. @@ -16606,7 +16611,7 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -16620,7 +16625,7 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16630,12 +16635,12 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) )), -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -17330,6 +17335,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Prepare new assignment for the group. @@ -17353,7 +17359,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -17371,14 +17377,89 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { .setMemberId(memberId) .setMemberEpoch(result.response().data().memberEpoch())); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setEndpointInformationEpoch(-1), + result.response().data() + ); + } + + @Test + public void testStreamsInitialRebalanceDelay_EmptyDuringDelay_AssignsAfterTimer() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000) + .build(); + + CoordinatorResult result; + + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) .setMemberEpoch(1) .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setPartitionsByUserEndpoint(null) .setEndpointInformationEpoch(-1), result.response().data() ); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); + + context.sleep(10000); + + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(1) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); } @Test @@ -17940,6 +18021,7 @@ public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() { .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member 1 joins the streams group. The request fails because the @@ -17955,7 +18037,7 @@ public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() { .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of()))); - assertEquals("Failed to compute a new target assignment for epoch 1: Assignment failed.", e.getMessage()); + assertEquals("Failed to compute a new target assignment for epoch 2: Assignment failed.", e.getMessage()); } @Test @@ -18198,6 +18280,7 @@ public void testStreamsSessionTimeoutLifecycle() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -18216,7 +18299,7 @@ public void testStreamsSessionTimeoutLifecycle() { .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); // Verify that there is a session time. context.assertSessionTimeout(groupId, memberId, 45000); @@ -18233,7 +18316,7 @@ public void testStreamsSessionTimeoutLifecycle() { .setGroupId(groupId) .setMemberId(memberId) .setMemberEpoch(result.response().data().memberEpoch())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); // Verify that there is a session time. context.assertSessionTimeout(groupId, memberId, 45000); @@ -18278,6 +18361,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -18296,7 +18380,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); // Verify that there is a session time. context.assertSessionTimeout(groupId, memberId, 45000); @@ -18313,7 +18397,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) ) ) )), @@ -18343,6 +18427,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -18365,7 +18450,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18406,7 +18491,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(2) + .setMemberEpoch(3) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -18426,13 +18511,13 @@ public void testStreamsRebalanceTimeoutLifecycle() { new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setRebalanceTimeoutMs(12000)); assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18459,7 +18544,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setActiveTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds() .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) @@ -18469,7 +18554,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(2) + .setMemberEpoch(3) .setHeartbeatIntervalMs(5000) .setEndpointInformationEpoch(0), result.response().data() @@ -18506,6 +18591,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment( @@ -18527,7 +18613,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18568,7 +18654,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(2) + .setMemberEpoch(3) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -18588,12 +18674,12 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(1)); + .setMemberEpoch(2)); assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18617,7 +18703,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 4, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) ) ) )), @@ -18744,6 +18830,7 @@ public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Prepare new assignment for the group. @@ -18769,7 +18856,7 @@ public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18969,6 +19056,7 @@ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { MockTaskAssignor assignor = new MockTaskAssignor("sticky"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); ClassicGroup classicGroup = new ClassicGroup( new LogContext(), @@ -18991,7 +19079,7 @@ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { StreamsGroupMember expectedMember = StreamsGroupMember.Builder.withDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setRebalanceTimeoutMs(5000) .setClientId(DEFAULT_CLIENT_ID) @@ -19008,9 +19096,9 @@ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 1, 0, -1, Map.of("num.standby.replicas", "0")), + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 2, 0, -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId, expectedMember) ), result.records() @@ -19308,6 +19396,7 @@ public void testStreamsGroupDynamicConfigs() { .addTopic(fooTopicId, fooTopicName, 6) .addRacks() .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment( @@ -19326,7 +19415,7 @@ public void testStreamsGroupDynamicConfigs() { .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); assertEquals(Map.of("num.standby.replicas", "0"), assignor.lastPassedAssignmentConfigs()); // Verify heartbeat interval diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 7ffeb6ac0357d..32df2b6f65355 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -160,6 +160,7 @@ public void createTopics(final TestInfo testInfo) throws InterruptedException { appId = safeUniqueTestName(testInfo); inputStream = appId + "-input-stream"; CLUSTER.createTopic(inputStream, 2, 1); + CLUSTER.setGroupStreamsInitialRebalanceDelay(appId, 0); } private Properties props() { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 1de7a45bfce7a..bcbe3934f216f 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -480,6 +480,19 @@ public void setGroupStandbyReplicas(final String groupId, final int numStandbyRe } } + public void setGroupStreamsInitialRebalanceDelay(final String groupId, final int initialRebalanceDelayMs) { + try (final Admin adminClient = createAdminClient()) { + adminClient.incrementalAlterConfigs( + Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), + List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, String.valueOf(initialRebalanceDelayMs)), AlterConfigOp.OpType.SET)) + ) + ).all().get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + private final class TopicsRemainingCondition implements TestCondition { final Set remainingTopics = new HashSet<>(); diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java index 723e928e770a1..3915f2e73b7cb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java @@ -167,9 +167,10 @@ public void testDescribeStreamsGroupWithStateOption() throws Exception { @Test public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws Exception { final List expectedHeader = List.of("GROUP", "COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS"); - final Set> expectedRows = Set.of(List.of(APP_ID, "", "", "Stable", "3", "3", "2")); + final Set> expectedRows = Set.of(List.of(APP_ID, "", "", "Stable", "", "", "2")); // The coordinator is not deterministic, so we don't care about it. - final List dontCares = List.of(1, 2); + // The GROUP-EPOCH and TARGET-ASSIGNMENT-EPOCH can vary due to rebalance timing, so we don't care about them either. + final List dontCares = List.of(1, 2, 4, 5); validateDescribeOutput( List.of("--bootstrap-server", bootstrapServers, "--describe", "--state", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); @@ -194,10 +195,11 @@ public void testDescribeStreamsGroupWithMembersOption() throws Exception { public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Exception { final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows = Set.of( - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"), - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];")); + List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", "0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"), + List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", "0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];")); // The member and process names as well as client-id are not deterministic, so we don't care about them. - final List dontCares = List.of(3, 6, 7); + // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to rebalance timing, so we don't care about them either. + final List dontCares = List.of(1, 3, 5, 6, 7); validateDescribeOutput( List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID), expectedHeader, expectedRows, dontCares); @@ -208,22 +210,28 @@ public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Except @Test public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throws Exception { cluster.createTopic(INPUT_TOPIC_2, 1, 1); + TestUtils.waitForCondition( + () -> cluster.getAllTopicsInCluster().contains(INPUT_TOPIC_2), + 30000, + "Topic " + INPUT_TOPIC_2 + " not created" + ); KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2)); startApplicationAndWaitUntilRunning(streams2); final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows1 = Set.of( - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"), - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];")); + List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", "0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"), + List.of(APP_ID, "", "0", "", "streams", "", "", "", "ACTIVE:", "0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];")); final Set> expectedRows2 = Set.of( - List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0];", "TARGET-ACTIVE:", "1:[0];"), - List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0];", "TARGET-ACTIVE:", "0:[0];")); + List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:", "1:[0];", "TARGET-ACTIVE:", "1:[0];"), + List.of(APP_ID_2, "", "0", "", "streams", "", "", "", "ACTIVE:", "0:[0];", "TARGET-ACTIVE:", "0:[0];")); final Map>> expectedRowsMap = new HashMap<>(); expectedRowsMap.put(APP_ID, expectedRows1); expectedRowsMap.put(APP_ID_2, expectedRows2); // The member and process names as well as client-id are not deterministic, so we don't care about them. - final List dontCares = List.of(3, 6, 7); + // The TARGET-ASSIGNMENT-EPOCH and MEMBER-EPOCH can vary due to rebalance timing, so we don't care about them either. + final List dontCares = List.of(1, 3, 5, 6, 7); validateDescribeOutput( List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--group", APP_ID_2), @@ -254,7 +262,9 @@ public void testDescribeNonExistingStreamsGroup() { @Test public void testDescribeStreamsGroupWithShortTimeout() { - List args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1"); + // Note: 1ms timeout may not always trigger timeout on fast machines with warm groups + // Using 0ms to ensure timeout + List args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "0"); Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups()); assertEquals(TimeoutException.class, e.getCause().getClass()); }