diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 1b3f3a50133ff..5996614de4171 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -16,16 +16,13 @@ */ package kafka.server.share; -import kafka.server.ActionQueue; import kafka.server.DelayedOperation; -import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.ShareFetchData; import org.apache.kafka.server.storage.log.FetchPartitionData; @@ -55,25 +52,19 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetchData shareFetchData; private final ReplicaManager replicaManager; - private final Map partitionCacheMap; - private final ActionQueue delayedActionQueue; - private final DelayedOperationPurgatory delayedShareFetchPurgatory; private Map topicPartitionDataFromTryComplete; + private final SharePartitionManager sharePartitionManager; DelayedShareFetch( ShareFetchData shareFetchData, ReplicaManager replicaManager, - Map partitionCacheMap, - ActionQueue delayedActionQueue, - DelayedOperationPurgatory delayedShareFetchPurgatory) { + SharePartitionManager sharePartitionManager) { super(shareFetchData.fetchParams().maxWaitMs, Option.empty()); this.shareFetchData = shareFetchData; this.replicaManager = replicaManager; - this.partitionCacheMap = partitionCacheMap; - this.delayedActionQueue = delayedActionQueue; - this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; this.topicPartitionDataFromTryComplete = new LinkedHashMap<>(); + this.sharePartitionManager = sharePartitionManager; } @Override @@ -129,7 +120,7 @@ public void onComplete() { }); log.trace("Data successfully retrieved by replica manager: {}", responseData); - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, partitionCacheMap, replicaManager) + ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, replicaManager) .whenComplete((result, throwable) -> { if (throwable != null) { log.error("Error processing fetch response for share partitions", throwable); @@ -143,12 +134,7 @@ public void onComplete() { // then we should check if there is a pending share fetch request for the topic-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // we directly call delayedShareFetchPurgatory.checkAndComplete - delayedActionQueue.add(() -> { - result.keySet().forEach(topicIdPartition -> - delayedShareFetchPurgatory.checkAndComplete( - new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); - return BoxedUnit.UNIT; - }); + sharePartitionManager.addPurgatoryCheckAndCompleteDelayedActionToActionQueue(result.keySet(), shareFetchData.groupId()); }); } catch (Exception e) { @@ -187,8 +173,11 @@ Map acquirablePartitions() { Map topicPartitionData = new LinkedHashMap<>(); shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> { - SharePartition sharePartition = partitionCacheMap.get(new SharePartitionKey( - shareFetchData.groupId(), topicIdPartition)); + SharePartition sharePartition = sharePartitionManager.sharePartition(shareFetchData.groupId(), topicIdPartition); + if (sharePartition == null) { + log.error("Encountered null share partition for groupId={}, topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition); + return; + } int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0); // Add the share partition to the list of partitions to be fetched only if we can @@ -217,7 +206,13 @@ Map acquirablePartitions() { } private void releasePartitionLocks(String groupId, Set topicIdPartitions) { - topicIdPartitions.forEach(tp -> partitionCacheMap.get(new - SharePartitionKey(groupId, tp)).releaseFetchLock()); + topicIdPartitions.forEach(tp -> { + SharePartition sharePartition = sharePartitionManager.sharePartition(groupId, tp); + if (sharePartition == null) { + log.error("Encountered null share partition for groupId={}, topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp); + return; + } + sharePartition.releaseFetchLock(); + }); } } diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchGroupKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchGroupKey.java new file mode 100644 index 0000000000000..4e9e4dbbe882a --- /dev/null +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchGroupKey.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.DelayedOperationKey; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +/** + * A key for delayed share fetch purgatory that refers to the share partition. + */ +public class DelayedShareFetchGroupKey implements DelayedShareFetchKey, DelayedOperationKey { + private final String groupId; + private final Uuid topicId; + private final int partition; + + DelayedShareFetchGroupKey(String groupId, Uuid topicId, int partition) { + this.groupId = groupId; + this.topicId = topicId; + this.partition = partition; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DelayedShareFetchGroupKey that = (DelayedShareFetchGroupKey) o; + return topicId.equals(that.topicId) && partition == that.partition && groupId.equals(that.groupId); + } + + @Override + public int hashCode() { + return Objects.hash(topicId, partition, groupId); + } + + @Override + public String toString() { + return "DelayedShareFetchGroupKey(groupId=" + groupId + + ", topicId=" + topicId + + ", partition=" + partition + + ")"; + } + + @Override + public String keyLabel() { + return String.format("groupId=%s, topicId=%s, partition=%s", groupId, topicId, partition); + } +} diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java index a4c13f8f1a8fe..d12c30bd289e5 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchKey.java @@ -16,44 +16,8 @@ */ package kafka.server.share; -import kafka.server.DelayedOperationKey; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.server.share.SharePartitionKey; - -import java.util.Objects; - /** * A key for delayed operations that fetch data for share consumers. */ -public class DelayedShareFetchKey extends SharePartitionKey implements DelayedOperationKey { - - DelayedShareFetchKey(String groupId, TopicIdPartition topicIdPartition) { - super(groupId, topicIdPartition); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - DelayedShareFetchKey that = (DelayedShareFetchKey) o; - return topicIdPartition.equals(that.topicIdPartition) && groupId.equals(that.groupId); - } - - @Override - public int hashCode() { - return Objects.hash(topicIdPartition, groupId); - } - - @Override - public String toString() { - return "DelayedShareFetchKey(groupId=" + groupId + - ", topicIdPartition=" + topicIdPartition + - ")"; - } - - @Override - public String keyLabel() { - return String.format("groupId=%s, topicIdPartition=%s", groupId, topicIdPartition); - } +public interface DelayedShareFetchKey { } diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java b/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java new file mode 100644 index 0000000000000..1aa6b29edb820 --- /dev/null +++ b/core/src/main/java/kafka/server/share/DelayedShareFetchPartitionKey.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.DelayedOperationKey; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +/** + * A key for delayed share fetch purgatory that refers to the topic partition. + */ +public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey, DelayedOperationKey { + private final Uuid topicId; + private final int partition; + + DelayedShareFetchPartitionKey(Uuid topicId, int partition) { + this.topicId = topicId; + this.partition = partition; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DelayedShareFetchPartitionKey that = (DelayedShareFetchPartitionKey) o; + return topicId.equals(that.topicId) && partition == that.partition; + } + + @Override + public int hashCode() { + return Objects.hash(topicId, partition); + } + + @Override + public String toString() { + return "DelayedShareFetchPartitionKey(topicId=" + topicId + + ", partition=" + partition + ")"; + } + + @Override + public String keyLabel() { + return String.format("topicId=%s, partition=%s", topicId, partition); + } +} diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index d987bee2c3fe8..50aba6b66ca68 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.requests.ListOffsetsRequest; -import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.ShareFetchData; import org.apache.kafka.server.storage.log.FetchPartitionData; @@ -49,14 +48,17 @@ public class ShareFetchUtils { static CompletableFuture> processFetchResponse( ShareFetchData shareFetchData, Map responseData, - Map partitionCacheMap, + SharePartitionManager sharePartitionManager, ReplicaManager replicaManager ) { Map> futures = new HashMap<>(); responseData.forEach((topicIdPartition, fetchPartitionData) -> { - SharePartition sharePartition = partitionCacheMap.get(new SharePartitionKey( - shareFetchData.groupId(), topicIdPartition)); + SharePartition sharePartition = sharePartitionManager.sharePartition(shareFetchData.groupId(), topicIdPartition); + if (sharePartition == null) { + log.error("Encountered null share partition for groupId={}, topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition); + return; + } futures.put(topicIdPartition, sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData) .handle((acquiredRecords, throwable) -> { log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 16ed2b214e5b6..9a4de4d827a84 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1788,7 +1788,7 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las // If we have an acquisition lock timeout for a share-partition, then we should check if // there is a pending share fetch request for the share-partition and complete it. - DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchKey(groupId, topicIdPartition); + DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); delayedShareFetchPurgatory.checkAndComplete(delayedShareFetchKey); }); } diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index e09931be65586..5e9fc715c10a9 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -74,6 +74,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import scala.jdk.javaapi.CollectionConverters; +import scala.runtime.BoxedUnit; /** * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. @@ -303,7 +304,7 @@ public CompletableFuture topicIdPartitions, String groupId) { + delayedActionsQueue.add(() -> { + topicIdPartitions.forEach(topicIdPartition -> + delayedShareFetchPurgatory.checkAndComplete( + new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); + return BoxedUnit.UNIT; + }); + } + /** * The release session method is used to release the session for the memberId of respective group. * The method post removing session also releases acquired records for the respective member. @@ -372,7 +382,7 @@ public CompletableFuture delayedShareFetchWatchKeys = new HashSet<>(); shareFetchData.partitionMaxBytes().keySet().forEach( - topicIdPartition -> delayedShareFetchWatchKeys.add( - new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); + topicIdPartition -> { + // We add a key corresponding to each share partition in the request in the group so that when there are + // acknowledgements/acquisition lock timeout etc, we have a way to perform checkAndComplete for all + // such requests which are delayed because of lack of data to acquire for the share partition. + delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + // We add a key corresponding to each topic partition in the request so that when the HWM is updated + // for any topic partition, we have a way to perform checkAndComplete for all such requests which are + // delayed because of lack of data to acquire for the topic partition. + delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition())); + }); // Add the share fetch to the delayed share fetch purgatory to process the fetch request. - addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedActionsQueue, delayedShareFetchPurgatory), + addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this), delayedShareFetchWatchKeys); // Release the lock so that other threads can process the queue. @@ -631,7 +649,7 @@ void maybeProcessFetchQueue() { } } - private SharePartition fetchSharePartition(SharePartitionKey sharePartitionKey) { + private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) { return partitionCacheMap.computeIfAbsent(sharePartitionKey, k -> { long start = time.hiResClockMs(); @@ -694,6 +712,16 @@ private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition top return new SharePartitionKey(groupId, topicIdPartition); } + /** + * + * @param groupId The share group id, this is used to identify the share group. + * @param topicIdPartition The topic partition that the group is subscribed to. + * @return The share partition stored for the share group topic-partition. + */ + protected SharePartition sharePartition(String groupId, TopicIdPartition topicIdPartition) { + return partitionCacheMap.get(sharePartitionKey(groupId, topicIdPartition)); + } + static class ShareGroupMetrics { /** * share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) - The total number of offsets acknowledged for share groups (requests to be ack). diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java index 91b0d0ae082ed..bca879d105cf3 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchKeyTest.java @@ -38,11 +38,14 @@ public void testDelayedShareFetchEqualsAndHashcode() { TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)); Map keyMap = new HashMap<>(); - keyMap.put("key0", new DelayedShareFetchKey("grp", tp0)); - keyMap.put("key1", new DelayedShareFetchKey("grp", tp1)); - keyMap.put("key2", new DelayedShareFetchKey("grp", tp2)); - keyMap.put("key3", new DelayedShareFetchKey("grp2", tp0)); - keyMap.put("key4", new DelayedShareFetchKey("grp2", tp1)); + keyMap.put("key0", new DelayedShareFetchGroupKey("grp", tp0.topicId(), tp0.partition())); + keyMap.put("key1", new DelayedShareFetchGroupKey("grp", tp1.topicId(), tp1.partition())); + keyMap.put("key2", new DelayedShareFetchGroupKey("grp", tp2.topicId(), tp2.partition())); + keyMap.put("key3", new DelayedShareFetchGroupKey("grp2", tp0.topicId(), tp0.partition())); + keyMap.put("key4", new DelayedShareFetchGroupKey("grp2", tp1.topicId(), tp1.partition())); + keyMap.put("key5", new DelayedShareFetchPartitionKey(tp0.topicId(), tp0.partition())); + keyMap.put("key6", new DelayedShareFetchPartitionKey(tp1.topicId(), tp1.partition())); + keyMap.put("key7", new DelayedShareFetchPartitionKey(tp2.topicId(), tp2.partition())); keyMap.forEach((key1, value1) -> keyMap.forEach((key2, value2) -> { if (key1.equals(key2)) { diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index e4d0c84dfcd20..07286670f3dc0 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -97,9 +97,9 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() { when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); ShareFetchData shareFetchData = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, @@ -110,7 +110,7 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() { when(sp1.canAcquireRecords()).thenReturn(false); DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) - .withPartitionCacheMap(partitionCacheMap) + .withSharePartitionManager(sharePartitionManager) .build(); // Since there is no partition that can be acquired, tryComplete should return false. @@ -135,9 +135,9 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); ShareFetchData shareFetchData = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, @@ -152,7 +152,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) - .withPartitionCacheMap(partitionCacheMap) + .withSharePartitionManager(sharePartitionManager) .withReplicaManager(replicaManager) .build(); assertFalse(delayedShareFetch.isCompleted()); @@ -179,9 +179,9 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() { when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); ShareFetchData shareFetchData = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, @@ -193,7 +193,7 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() { DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) + .withSharePartitionManager(sharePartitionManager) .build(); assertFalse(delayedShareFetch.isCompleted()); delayedShareFetch.forceComplete(); @@ -222,9 +222,9 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp1.maybeAcquireFetchLock()).thenReturn(true); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); ShareFetchData shareFetchData = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, @@ -239,7 +239,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) + .withSharePartitionManager(sharePartitionManager) .build(); assertFalse(delayedShareFetch.isCompleted()); delayedShareFetch.forceComplete(); @@ -263,8 +263,8 @@ public void testToCompleteAnAlreadyCompletedFuture() { SharePartition sp0 = mock(SharePartition.class); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); CompletableFuture> future = new CompletableFuture<>(); ShareFetchData shareFetchData = new ShareFetchData( @@ -278,7 +278,7 @@ public void testToCompleteAnAlreadyCompletedFuture() { DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) + .withSharePartitionManager(sharePartitionManager) .build()); assertFalse(delayedShareFetch.isCompleted()); @@ -317,10 +317,10 @@ public void testForceCompleteTriggersDelayedActionsQueue() { when(sp1.maybeAcquireFetchLock()).thenReturn(false); when(sp2.maybeAcquireFetchLock()).thenReturn(false); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + SharePartitionManager sharePartitionManager1 = mock(SharePartitionManager.class); + when(sharePartitionManager1.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager1.sharePartition(groupId, tp1)).thenReturn(sp1); + when(sharePartitionManager1.sharePartition(groupId, tp2)).thenReturn(sp2); ShareFetchData shareFetchData1 = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, @@ -332,12 +332,12 @@ public void testForceCompleteTriggersDelayedActionsQueue() { DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); Set delayedShareFetchWatchKeys = new HashSet<>(); - partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData1) .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) + .withSharePartitionManager(sharePartitionManager1) .build(); // We add a delayed share fetch entry to the purgatory which will be waiting for completion since neither of the @@ -358,12 +358,22 @@ public void testForceCompleteTriggersDelayedActionsQueue() { doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue()); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + SharePartitionManager sharePartitionManager2 = SharePartitionManagerTest.SharePartitionManagerBuilder + .builder() + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withDelayedActionsQueue(delayedActionQueue) + .withPartitionCacheMap(partitionCacheMap) + .build(); + DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData2) .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) - .withDelayedActionQueue(delayedActionQueue) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withSharePartitionManager(sharePartitionManager2) .build(); // sp1 can be acquired now @@ -387,9 +397,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { static class DelayedShareFetchBuilder { ShareFetchData shareFetchData = mock(ShareFetchData.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); - private Map partitionCacheMap = new HashMap<>(); - private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); - private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); + private SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) { this.shareFetchData = shareFetchData; @@ -401,18 +409,8 @@ DelayedShareFetchBuilder withReplicaManager(ReplicaManager replicaManager) { return this; } - DelayedShareFetchBuilder withPartitionCacheMap(Map partitionCacheMap) { - this.partitionCacheMap = partitionCacheMap; - return this; - } - - DelayedShareFetchBuilder withDelayedActionQueue(DelayedActionQueue delayedActionsQueue) { - this.delayedActionsQueue = delayedActionsQueue; - return this; - } - - DelayedShareFetchBuilder withDelayedShareFetchPurgatory(DelayedOperationPurgatory delayedShareFetchPurgatory) { - this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + DelayedShareFetchBuilder withSharePartitionManager(SharePartitionManager sharePartitionManager) { + this.sharePartitionManager = sharePartitionManager; return this; } @@ -424,9 +422,7 @@ public DelayedShareFetch build() { return new DelayedShareFetch( shareFetchData, replicaManager, - partitionCacheMap, - delayedActionsQueue, - delayedShareFetchPurgatory); + sharePartitionManager); } } } diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index 94762d4cccbca..e5cf67d1ae872 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.ShareFetchData; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; @@ -46,7 +45,6 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import scala.Option; @@ -92,9 +90,9 @@ public void testProcessFetchResponse() { doNothing().when(sp1).updateCacheAndOffsets(any(Long.class)); doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); ShareFetchData shareFetchData = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, @@ -121,7 +119,7 @@ public void testProcessFetchResponse() { records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); CompletableFuture> result = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, partitionCacheMap, mock(ReplicaManager.class)); + ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, mock(ReplicaManager.class)); assertTrue(result.isDone()); Map resultData = result.join(); @@ -163,9 +161,9 @@ public void testProcessFetchResponseWithEmptyRecords() { doNothing().when(sp1).updateCacheAndOffsets(any(Long.class)); doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); ShareFetchData shareFetchData = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, @@ -180,7 +178,7 @@ public void testProcessFetchResponseWithEmptyRecords() { MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); CompletableFuture> result = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, partitionCacheMap, mock(ReplicaManager.class)); + ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitionManager, mock(ReplicaManager.class)); assertTrue(result.isDone()); Map resultData = result.join(); @@ -209,9 +207,9 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() { SharePartition sp0 = Mockito.mock(SharePartition.class); SharePartition sp1 = Mockito.mock(SharePartition.class); - Map partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + when(sharePartitionManager.sharePartition(groupId, tp0)).thenReturn(sp0); + when(sharePartitionManager.sharePartition(groupId, tp1)).thenReturn(sp1); ShareFetchData shareFetchData = new ShareFetchData( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, @@ -253,7 +251,7 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() { records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); CompletableFuture> result1 = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, partitionCacheMap, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, sharePartitionManager, replicaManager); assertTrue(result1.isDone()); Map resultData1 = result1.join(); @@ -284,7 +282,7 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() { MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); CompletableFuture> result2 = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, partitionCacheMap, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, sharePartitionManager, replicaManager); assertTrue(result2.isDone()); Map resultData2 = result2.join(); diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index b1e1bc627f8bc..92438abdaccc2 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -1731,13 +1731,20 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() { when(sp2.canAcquireRecords()).thenReturn(false); Set delayedShareFetchWatchKeys = new HashSet<>(); - partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build(); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) - .build(); + .withShareFetchData(shareFetchData) + .withReplicaManager(replicaManager) + .withSharePartitionManager(sharePartitionManager) + .build(); delayedShareFetchPurgatory.tryCompleteElseWatch( delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); @@ -1745,13 +1752,6 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() { // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. assertEquals(2, delayedShareFetchPurgatory.watched()); - SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) - .withReplicaManager(replicaManager) - .withTimer(mockTimer) - .build(); - doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); Map> acknowledgeTopics = new HashMap<>(); @@ -1829,13 +1829,20 @@ public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() { when(sp3.canAcquireRecords()).thenReturn(false); Set delayedShareFetchWatchKeys = new HashSet<>(); - partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build(); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) - .build(); + .withShareFetchData(shareFetchData) + .withReplicaManager(replicaManager) + .withSharePartitionManager(sharePartitionManager) + .build(); delayedShareFetchPurgatory.tryCompleteElseWatch( delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); @@ -1843,13 +1850,6 @@ public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() { // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. assertEquals(2, delayedShareFetchPurgatory.watched()); - SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) - .withReplicaManager(replicaManager) - .withTimer(mockTimer) - .build(); - Map> acknowledgeTopics = new HashMap<>(); acknowledgeTopics.put(tp3, Arrays.asList( new ShareAcknowledgementBatch(12, 20, Collections.singletonList((byte) 1)), @@ -1920,13 +1920,21 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() { when(sp2.canAcquireRecords()).thenReturn(false); Set delayedShareFetchWatchKeys = new HashSet<>(); - partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); + + SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withCache(cache) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build()); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) - .build(); + .withShareFetchData(shareFetchData) + .withReplicaManager(replicaManager) + .withSharePartitionManager(sharePartitionManager) + .build(); delayedShareFetchPurgatory.tryCompleteElseWatch( delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); @@ -1934,14 +1942,6 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() { // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. assertEquals(2, delayedShareFetchPurgatory.watched()); - SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) - .withCache(cache) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) - .withReplicaManager(replicaManager) - .withTimer(mockTimer) - .build()); - doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); assertEquals(2, delayedShareFetchPurgatory.watched()); @@ -2022,13 +2022,21 @@ public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() { when(sp3.canAcquireRecords()).thenReturn(false); Set delayedShareFetchWatchKeys = new HashSet<>(); - partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); + + SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap) + .withCache(cache) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build()); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) - .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) - .build(); + .withShareFetchData(shareFetchData) + .withReplicaManager(replicaManager) + .withSharePartitionManager(sharePartitionManager) + .build(); delayedShareFetchPurgatory.tryCompleteElseWatch( delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); @@ -2036,14 +2044,6 @@ public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() { // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. assertEquals(2, delayedShareFetchPurgatory.watched()); - SharePartitionManager sharePartitionManager = spy(SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) - .withCache(cache) - .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) - .withReplicaManager(replicaManager) - .withTimer(mockTimer) - .build()); - // The share session for this share group member returns tp1 and tp3. No topic partition is common in // both the delayed fetch request and the share session. when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId))).thenReturn(Collections.singletonList(tp3)); @@ -2288,7 +2288,7 @@ static Seq> buildLogReadResult(Set fetchQueue = new ConcurrentLinkedQueue<>(); private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); - private final DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); + private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2315,7 +2315,7 @@ private SharePartitionManagerBuilder withCache(ShareSessionCache cache) { return this; } - private SharePartitionManagerBuilder withPartitionCacheMap(Map partitionCacheMap) { + SharePartitionManagerBuilder withPartitionCacheMap(Map partitionCacheMap) { this.partitionCacheMap = partitionCacheMap; return this; } @@ -2340,11 +2340,16 @@ private SharePartitionManagerBuilder withFetchQueue(ConcurrentLinkedQueue delayedShareFetchPurgatory) { + SharePartitionManagerBuilder withDelayedShareFetchPurgatory(DelayedOperationPurgatory delayedShareFetchPurgatory) { this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; return this; } + SharePartitionManagerBuilder withDelayedActionsQueue(DelayedActionQueue delayedActionsQueue) { + this.delayedActionsQueue = delayedActionsQueue; + return this; + } + public static SharePartitionManagerBuilder builder() { return new SharePartitionManagerBuilder(); }