Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
implementation libs.guava

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="com.google.re2j" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="com.google.common.hash" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.HdrHistogram" />
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ versions += [
classgraph: "4.8.173",
gradle: "8.10.2",
grgit: "4.1.1",
guava: "33.4.0-jre",
httpclient: "4.5.14",
jackson: "2.16.2",
jacoco: "0.8.10",
Expand Down Expand Up @@ -147,6 +148,7 @@ libs += [
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
guava: "com.google.guava:guava:$versions.guava",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}

/**
* Computes the hash of the topics in a group.
*
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
Comment on lines +230 to +235
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The computeGroupHash method passes the list of topic hashes directly to Hashing.combineOrdered, which in Guava throws an IllegalArgumentException when the iterable is empty; this will cause a runtime failure if the method is ever called with an empty topicHashes map (e.g., for a group with no topics), so it should explicitly handle the empty case with a defined hash value. [logic error]

Severity Level: Minor ⚠️

Suggested change
return Hashing.combineOrdered(
topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList()
List<HashCode> topicHashCodes = topicHashes.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> HashCode.fromLong(e.getValue()))
.toList();
if (topicHashCodes.isEmpty()) {
// Define a stable hash value for groups without topics.
return 0L;
}
return Hashing.combineOrdered(topicHashCodes).asLong();
Why it matters? ⭐

The suggestion highlights a real runtime edge case: Guava's Hashing.combineOrdered(Iterable) throws IllegalArgumentException when given an empty iterable. The current implementation passes the stream result directly and thus will fail if topicHashes is empty (possible for a group with no topics). The improved code provides a safe, stable behavior (returns a defined hash for empty input) and prevents a potential crash. This is a functional bugfix, not just cosmetic.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
**Line:** 230:235
**Comment:**
	*Logic Error: The `computeGroupHash` method passes the list of topic hashes directly to `Hashing.combineOrdered`, which in Guava throws an `IllegalArgumentException` when the iterable is empty; this will cause a runtime failure if the method is ever called with an empty `topicHashes` map (e.g., for a group with no topics), so it should explicitly handle the empty case with a defined hash value.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

).asLong();
Comment on lines +229 to +236
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash Collision Risk

The computeGroupHash method combines hash values without salt or additional entropy, creating predictable hash patterns. Attackers could exploit hash collision vulnerabilities to cause hash table attacks or denial of service through algorithmic complexity attacks.

Standards
  • CWE-328
  • OWASP-A02
  • NIST-SSDF-PW.1

}

/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
*
* @param topicImage The topic image.
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weak Hash Algorithm

MurmurHash3 is a non-cryptographic hash function designed for speed rather than security. While suitable for hash tables, it's vulnerable to hash collision attacks where malicious input can be crafted to produce identical hash values.

Standards
  • CWE-328
  • OWASP-A02
  • NIST-SSDF-PW.1

Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(topicImage.id().hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using topicImage.id().hashCode() for hashing the topic ID is not ideal as it loses information from the 128-bit UUID, increasing the risk of hash collisions. Two different UUIDs could potentially have the same integer hash code. To create a more robust and collision-resistant hash, it's better to use the full 128 bits of the UUID by hashing the most and least significant bits.

Suggested change
.putLong(topicImage.id().hashCode()) // topic Id
.putLong(topicImage.id().getMostSignificantBits()).putLong(topicImage.id().getLeastSignificantBits()) // topic Id

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete UUID Hashing

The hashing algorithm uses the 32-bit integer hash code of the 128-bit topic UUID. This discards 96 bits of information from the UUID, significantly increasing the likelihood of hash collisions for different topics. This could lead to metadata changes being missed.

Standards
  • Logic-Verification-Data-Integrity
  • Algorithm-Correctness-Hashing

.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions

topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
topicHasher.putInt(entry.getKey()); // partition id
String racks = Arrays.stream(entry.getValue().replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
});
return topicHasher.hash().asLong();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataImage;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

public class GroupTest {
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
private static final String FOO_TOPIC_NAME = "foo";
private static final String BAR_TOPIC_NAME = "bar";
private static final int FOO_NUM_PARTITIONS = 2;
private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build();

@Test
void testComputeTopicHash() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In line with the suggested change in Group.java to use the full UUID for hashing, this test should be updated to use getMostSignificantBits() and getLeastSignificantBits() instead of hashCode() when constructing the expected hash. This ensures the tests remain consistent with the improved hashing logic. Please apply the same fix to testComputeTopicHashWithDifferentMagicByte, testComputeTopicHashWithDifferentPartitionOrder, and testComputeTopicHashWithDifferentRackOrder.

Suggested change
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putLong(FOO_TOPIC_ID.getMostSignificantBits()).putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id

.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentMagicByte() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 1) // different magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentPartitionOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
// different partition order
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
.putInt(0) // partition 0
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
assertNotEquals(topicHasher.hash().asLong(), result);
}

@Test
void testComputeTopicHashWithDifferentRackOrder() {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
.putInt(FOO_NUM_PARTITIONS) // number of partitions
.putInt(0) // partition 0
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
.putInt(1) // partition 1
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
assertNotEquals(topicHasher.hash().asLong(), result);
}

@ParameterizedTest
@MethodSource("differentFieldGenerator")
void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) {
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());

assertNotEquals(
Group.computeTopicHash(
differentImage.topics().getTopic(topicId),
differentImage.cluster()
),
result
);
}

private static Stream<Arguments> differentFieldGenerator() {
Uuid differentTopicId = Uuid.randomUuid();
return Stream.of(
Arguments.of(new MetadataImageBuilder() // different topic id
.addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build(),
differentTopicId
),
Arguments.of(new MetadataImageBuilder() // different topic name
.addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different partitions
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
.addRacks()
.build(),
FOO_TOPIC_ID
),
Arguments.of(new MetadataImageBuilder() // different racks
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.build(),
FOO_TOPIC_ID
)
);
}

@Test
void testComputeGroupHash() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long expected = Hashing.combineOrdered(List.of(
HashCode.fromLong(123L),
HashCode.fromLong(456L)
)).asLong();
assertEquals(expected, result);
}

@Test
void testComputeGroupHashWithDifferentOrder() {
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));

long unexpected = Hashing.combineOrdered(List.of(
HashCode.fromLong(456L),
HashCode.fromLong(123L)
)).asLong();
assertNotEquals(unexpected, result);
Comment on lines +178 to +187
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The test named to verify behavior with different topic order does not actually call the hash function with a differently ordered map, it only changes the order in a manually computed hash, so the test would still pass even if the production implementation were incorrectly sensitive to map iteration order, giving a false sense that ordering is handled correctly. [logic error]

Severity Level: Minor ⚠️

Suggested change
long result = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));
long unexpected = Hashing.combineOrdered(List.of(
HashCode.fromLong(456L),
HashCode.fromLong(123L)
)).asLong();
assertNotEquals(unexpected, result);
long result1 = Group.computeGroupHash(Map.of(
BAR_TOPIC_NAME, 123L,
FOO_TOPIC_NAME, 456L
));
long result2 = Group.computeGroupHash(Map.of(
FOO_TOPIC_NAME, 456L,
BAR_TOPIC_NAME, 123L
));
assertEquals(result1, result2);
Why it matters? ⭐

The current test only constructs one input map and then compares the result to a manually built ordered HashCode list; that doesn't exercise the producer code with a differently-ordered input map. The suggested change calls Group.computeGroupHash with the same entries in the opposite insertion order and asserts equality, which directly verifies that the implementation is order-independent. This is a meaningful behavioral test improvement (not just stylistic) and will catch real bugs where the implementation improperly depends on map iteration order.

Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
**Line:** 178:187
**Comment:**
	*Logic Error: The test named to verify behavior with different topic order does not actually call the hash function with a differently ordered map, it only changes the order in a manually computed hash, so the test would still pass even if the production implementation were incorrectly sensitive to map iteration order, giving a false sense that ordering is handled correctly.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

}
}
Loading