-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17703: Moved DelayedActionsQueue outside DelayedShareFetch #17380
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments.
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the PR. A couple of comments.
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
@adixitconfluent : Currently, we check the produce/fetch purgatory when a replica becomes the follower or a replica is deleted from a broker through ReplicaManager. applyLocalFollowersDelta and ReplicaManager.applyDelta. Should we do the same for share purgatory? |
@junrao , yes, I think we should do it. Moreover, with more use cases of delayed share fetch purgatory within ReplicaManager functionalities like -
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the updated PR. A few more comments.
Also, there seems to be another existing issue. If a sharePartition is being initialized, any pending shareFetchRequest is blocked in the purgatory. So, after the initialization completes, we need to trigger a check in the share fetch purgatory to wake up pending delayed operations.
*/ | ||
public class DelayedShareFetchGroupKey implements DelayedShareFetchKey, DelayedOperationKey { | ||
private final String groupId; | ||
private final TopicIdPartition topicIdPartition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TopicIdPartition contains both topicId and name. We just need the former in the key. Ditto for DelayedShareFetchPartitionKey.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we don't need the name, but we will need the partition number, so I'll add Uuid topicId and int partition to both DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey.
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
We can do that too if it makes code simpler. |
Hi @junrao, we are aware about the share partition initialization issues and we have a JIRA for it https://issues.apache.org/jira/browse/KAFKA-17510 , I've added In the description what you mentioned and it should be completed as part of that JIRA |
Thanks @junrao , I've created a JIRA https://issues.apache.org/jira/browse/KAFKA-17742 to address this in my future PRs. |
Re-opening PR to trigger build. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the updated PR. LGTM. Waiting for the tests to complete.
…che#17380) Move ActionQueue outside DelayedShareFetch class to SharePartitionManager where SharePartitionManager has the ability to add a delayed action to the ActionQueue. Add TopicPartition as a key for delayed share fetch along with SharePartition (that is already present right now). This will be utilized later when we add the code to know if new data has been added to the topic partition. Reviewers: Apoorv Mittal <[email protected]>, Jun Rao <[email protected]>
About
In reference to comments #17177 (comment) and #17177 (comment) , this PR addresses the following -
Move ActionQueue outside DelayedShareFetch class to SharePartitionManager where SharePartitionManager has the ability to add a delayed action to the ActionQueue.
Add TopicPartition as a key for delayed share fetch along with SharePartition (that is already present right now). This will be utilized later when we add the code to know if new data has been added to the topic partition.
Testing
The code has been tested with the help of unit tests.