-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17747: Add compute topic and group hash #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Signed-off-by: PoAn Yang <[email protected]>
|
CodeAnt AI is reviewing your PR. Thanks for using CodeAnt! 🎉We're free for open-source projects. if you're enjoying it, help us grow by sharing. Share on X · |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
@refacto-visz |
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces core hashing capabilities to the Kafka group coordinator, enabling the computation of unique identifiers for topics and consumer groups. By integrating the Guava hashing library, it provides a standardized and efficient way to generate hashes based on critical metadata such as topic IDs, names, partition configurations, and broker rack assignments. This enhancement is crucial for internal consistency checks or optimization strategies that rely on stable and reproducible identifiers. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Warning Rate limit exceeded@visz11 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 7 minutes and 2 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (5)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces hashing mechanisms for topics and consumer groups, which is a valuable addition. The implementation uses Guava's hashing library and includes a comprehensive set of unit tests to validate the new logic. My main feedback is to improve the robustness of the topic hash by using the full 128 bits of the topic UUID instead of its 32-bit hash code. This will significantly reduce the probability of hash collisions.
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using topicImage.id().hashCode() for hashing the topic ID is not ideal as it loses information from the 128-bit UUID, increasing the risk of hash collisions. Two different UUIDs could potentially have the same integer hash code. To create a more robust and collision-resistant hash, it's better to use the full 128 bits of the UUID by hashing the most and least significant bits.
| .putLong(topicImage.id().hashCode()) // topic Id | |
| .putLong(topicImage.id().getMostSignificantBits()).putLong(topicImage.id().getLeastSignificantBits()) // topic Id |
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line with the suggested change in Group.java to use the full UUID for hashing, this test should be updated to use getMostSignificantBits() and getLeastSignificantBits() instead of hashCode() when constructing the expected hash. This ensures the tests remain consistent with the improved hashing logic. Please apply the same fix to testComputeTopicHashWithDifferentMagicByte, testComputeTopicHashWithDifferentPartitionOrder, and testComputeTopicHashWithDifferentRackOrder.
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | |
| .putLong(FOO_TOPIC_ID.getMostSignificantBits()).putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id |
Code Review: Group Hashing ImplementationPR Confidence Score: 🟨 4 / 5👍 Well Done
📁 Selected files for review (5)
📝 Additional Comments
|
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete UUID Hashing
The hashing algorithm uses the 32-bit integer hash code of the 128-bit topic UUID. This discards 96 bits of information from the UUID, significantly increasing the likelihood of hash collisions for different topics. This could lead to metadata changes being missed.
Standards
- Logic-Verification-Data-Integrity
- Algorithm-Correctness-Hashing
| static long computeGroupHash(Map<String, Long> topicHashes) { | ||
| return Hashing.combineOrdered( | ||
| topicHashes.entrySet() | ||
| .stream() | ||
| .sorted(Map.Entry.comparingByKey()) | ||
| .map(e -> HashCode.fromLong(e.getValue())) | ||
| .toList() | ||
| ).asLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hash Collision Risk
The computeGroupHash method combines hash values without salt or additional entropy, creating predictable hash patterns. Attackers could exploit hash collision vulnerabilities to cause hash table attacks or denial of service through algorithmic complexity attacks.
Standards
- CWE-328
- OWASP-A02
- NIST-SSDF-PW.1
| * @return The hash of the topic. | ||
| */ | ||
| static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { | ||
| HashFunction hf = Hashing.murmur3_128(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weak Hash Algorithm
MurmurHash3 is a non-cryptographic hash function designed for speed rather than security. While suitable for hash tables, it's vulnerable to hash collision attacks where malicious input can be crafted to produce identical hash values.
Standards
- CWE-328
- OWASP-A02
- NIST-SSDF-PW.1
|
PR already reviewed at the latest commit: 36cb999. |
Nitpicks 🔍
|
| return Hashing.combineOrdered( | ||
| topicHashes.entrySet() | ||
| .stream() | ||
| .sorted(Map.Entry.comparingByKey()) | ||
| .map(e -> HashCode.fromLong(e.getValue())) | ||
| .toList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The computeGroupHash method passes the list of topic hashes directly to Hashing.combineOrdered, which in Guava throws an IllegalArgumentException when the iterable is empty; this will cause a runtime failure if the method is ever called with an empty topicHashes map (e.g., for a group with no topics), so it should explicitly handle the empty case with a defined hash value. [logic error]
Severity Level: Minor
| return Hashing.combineOrdered( | |
| topicHashes.entrySet() | |
| .stream() | |
| .sorted(Map.Entry.comparingByKey()) | |
| .map(e -> HashCode.fromLong(e.getValue())) | |
| .toList() | |
| List<HashCode> topicHashCodes = topicHashes.entrySet() | |
| .stream() | |
| .sorted(Map.Entry.comparingByKey()) | |
| .map(e -> HashCode.fromLong(e.getValue())) | |
| .toList(); | |
| if (topicHashCodes.isEmpty()) { | |
| // Define a stable hash value for groups without topics. | |
| return 0L; | |
| } | |
| return Hashing.combineOrdered(topicHashCodes).asLong(); |
Why it matters? ⭐
The suggestion highlights a real runtime edge case: Guava's Hashing.combineOrdered(Iterable) throws IllegalArgumentException when given an empty iterable. The current implementation passes the stream result directly and thus will fail if topicHashes is empty (possible for a group with no topics). The improved code provides a safe, stable behavior (returns a defined hash for empty input) and prevents a potential crash. This is a functional bugfix, not just cosmetic.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
**Line:** 230:235
**Comment:**
*Logic Error: The `computeGroupHash` method passes the list of topic hashes directly to `Hashing.combineOrdered`, which in Guava throws an `IllegalArgumentException` when the iterable is empty; this will cause a runtime failure if the method is ever called with an empty `topicHashes` map (e.g., for a group with no topics), so it should explicitly handle the empty case with a defined hash value.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.| long result = Group.computeGroupHash(Map.of( | ||
| BAR_TOPIC_NAME, 123L, | ||
| FOO_TOPIC_NAME, 456L | ||
| )); | ||
|
|
||
| long unexpected = Hashing.combineOrdered(List.of( | ||
| HashCode.fromLong(456L), | ||
| HashCode.fromLong(123L) | ||
| )).asLong(); | ||
| assertNotEquals(unexpected, result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The test named to verify behavior with different topic order does not actually call the hash function with a differently ordered map, it only changes the order in a manually computed hash, so the test would still pass even if the production implementation were incorrectly sensitive to map iteration order, giving a false sense that ordering is handled correctly. [logic error]
Severity Level: Minor
| long result = Group.computeGroupHash(Map.of( | |
| BAR_TOPIC_NAME, 123L, | |
| FOO_TOPIC_NAME, 456L | |
| )); | |
| long unexpected = Hashing.combineOrdered(List.of( | |
| HashCode.fromLong(456L), | |
| HashCode.fromLong(123L) | |
| )).asLong(); | |
| assertNotEquals(unexpected, result); | |
| long result1 = Group.computeGroupHash(Map.of( | |
| BAR_TOPIC_NAME, 123L, | |
| FOO_TOPIC_NAME, 456L | |
| )); | |
| long result2 = Group.computeGroupHash(Map.of( | |
| FOO_TOPIC_NAME, 456L, | |
| BAR_TOPIC_NAME, 123L | |
| )); | |
| assertEquals(result1, result2); |
Why it matters? ⭐
The current test only constructs one input map and then compares the result to a manually built ordered HashCode list; that doesn't exercise the producer code with a differently-ordered input map. The suggested change calls Group.computeGroupHash with the same entries in the opposite insertion order and asserts equality, which directly verifies that the implementation is order-independent. This is a meaningful behavioral test improvement (not just stylistic) and will catch real bugs where the implementation improperly depends on map iteration order.
Prompt for AI Agent 🤖
This is a comment left during a code review.
**Path:** group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
**Line:** 178:187
**Comment:**
*Logic Error: The test named to verify behavior with different topic order does not actually call the hash function with a differently ordered map, it only changes the order in a manually computed hash, so the test would still pass even if the production implementation were incorrectly sensitive to map iteration order, giving a false sense that ordering is handled correctly.
Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.|
CodeAnt AI finished reviewing your PR. |
CodeAnt-AI Description
Add topic and group hash computations to Group
What Changed
Impact
✅ Faster detection of topic changes affecting consumer groups✅ Fewer missed group metadata changes caused by unordered topic fields✅ Verified topic/group hashing behavior via new unit tests💡 Usage Guide
Checking Your Pull Request
Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.
Talking to CodeAnt AI
Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:
This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.
Example
Preserve Org Learnings with CodeAnt
You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:
This helps CodeAnt AI learn and adapt to your team's coding style and standards.
Example
Retrigger review
Ask CodeAnt AI to review the PR again, by typing:
Check Your Repository Health
To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.