Skip to content

Commit

Permalink
Upgrade to Kafka 3.8.0 (Axual#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandisseldorp authored Sep 17, 2024
1 parent e4738a2 commit deb39b3
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 36 deletions.
12 changes: 6 additions & 6 deletions ksml-kafka-clients/NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

Lists of 20 third-party dependencies.
(The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.13.5 - http://github.com/FasterXML/jackson)
(The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.13.5 - https://github.com/FasterXML/jackson-core)
(The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.13.5 - http://github.com/FasterXML/jackson)
(BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni)
(The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.16.2 - https://github.com/FasterXML/jackson)
(The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.16.2 - https://github.com/FasterXML/jackson-core)
(The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.16.2 - https://github.com/FasterXML/jackson)
(BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.6-3 - https://github.com/luben/zstd-jni)
(The Apache Software License, Version 2.0) FindBugs-jsr305 (com.google.code.findbugs:jsr305:3.0.2 - http://findbugs.sourceforge.net/)
(Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.26.1 - https://errorprone.info/error_prone_annotations)
(The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.2 - https://github.com/google/guava/failureaccess)
Expand All @@ -12,8 +12,8 @@ Lists of 20 third-party dependencies.
(Apache License, Version 2.0) J2ObjC Annotations (com.google.j2objc:j2objc-annotations:3.0.0 - https://github.com/google/j2objc/)
(Apache-2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.14.0 - https://commons.apache.org/proper/commons-lang/)
(Apache-2.0) Apache Commons Text (org.apache.commons:commons-text:1.12.0 - https://commons.apache.org/proper/commons-text)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.6.2 - https://kafka.apache.org)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.6.2 - https://kafka.apache.org)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.8.0 - https://kafka.apache.org)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.8.0 - https://kafka.apache.org)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.42.0 - https://checkerframework.org/)
(The Apache Software License, Version 2.0) LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java)
(The MIT License) Project Lombok (org.projectlombok:lombok:1.18.32 - https://projectlombok.org)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ResolvingAdmin extends ForwardingAdmin {

public ResolvingAdmin(Map<String, Object> configs) {
super(configs);
var config = new ResolvingClientConfig(configs);
final var config = new ResolvingClientConfig(configs);
topicResolver = config.topicResolver;
groupResolver = config.groupResolver;
}
Expand All @@ -58,7 +58,7 @@ public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTop

@Override
public DeleteTopicsResult deleteTopics(TopicCollection topicCollection, DeleteTopicsOptions options) {
var result = super.deleteTopics(topicResolver.resolve(topicCollection), options);
final var result = super.deleteTopics(topicResolver.resolve(topicCollection), options);
return new ResolvingDeleteTopicsResult(result.topicIdValues(), result.topicNameValues(), topicResolver);
}

Expand All @@ -69,13 +69,13 @@ public ListTopicsResult listTopics(ListTopicsOptions options) {

@Override
public DescribeTopicsResult describeTopics(TopicCollection topicCollection, DescribeTopicsOptions options) {
var result = super.describeTopics(topicResolver.resolve(topicCollection), options);
final var result = super.describeTopics(topicResolver.resolve(topicCollection), options);
return new ResolvingDescribeTopicsResult(result.topicIdValues(), result.topicNameValues(), topicResolver);
}

@Override
public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) {
var result = super.describeAcls(ResolverUtil.resolve(filter, topicResolver, groupResolver), options);
final var result = super.describeAcls(ResolverUtil.resolve(filter, topicResolver, groupResolver), options);
if (result == null) return null;
return new ResolvingDescribeAclsResult(result.values(), topicResolver, groupResolver);
}
Expand Down Expand Up @@ -156,28 +156,31 @@ public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions opt
@Override
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
// Resolve the groupSpecs
var newGroupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>();
final var newGroupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>();
groupSpecs.forEach((groupId, spec) -> newGroupSpecs.put(groupResolver.resolve(groupId), new ListConsumerGroupOffsetsSpec().topicPartitions(topicResolver.resolveTopicPartitions(spec.topicPartitions()))));

// Resolve the options
if (options != null) {
var newOptions = new ListConsumerGroupOffsetsOptions().requireStable(options.requireStable());
if (options.topicPartitions() != null) {
var newTopicPartitions = new ArrayList<TopicPartition>(options.topicPartitions().size());
options.topicPartitions().forEach(tp -> newTopicPartitions.add(topicResolver.resolve(tp)));
final var newOptions = new ListConsumerGroupOffsetsOptions().requireStable(options.requireStable());
final var topicPartitions = options.topicPartitions();
if (topicPartitions != null) {
final var newTopicPartitions = new ArrayList<TopicPartition>(topicPartitions.size());
topicPartitions.forEach(tp -> newTopicPartitions.add(topicResolver.resolve(tp)));
newOptions.topicPartitions(newTopicPartitions);
}
options = newOptions;
}

// Call the original API
var result = super.listConsumerGroupOffsets(newGroupSpecs, options);
// Convert the result to an unresolving result
var newResult = new HashMap<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>>();
final var result = super.listConsumerGroupOffsets(newGroupSpecs, options);

// Convert the result to an unresolved result
final var newResult = new HashMap<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>>();
newGroupSpecs.keySet().forEach(groupId -> {
var future = result.partitionsToOffsetAndMetadata(groupId);
final var future = result.partitionsToOffsetAndMetadata(groupId);
newResult.put(CoordinatorKey.byGroupId(groupId), future);
});

return new ResolvingListConsumerGroupOffsetsResult(newResult, topicResolver, groupResolver);
}

Expand Down Expand Up @@ -262,9 +265,9 @@ public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteratio

private Collection<NewTopic> resolveNewTopics(Collection<NewTopic> newTopics) {
// Resolve all new topics into a new collection
var resolvedTopics = new ArrayList<NewTopic>();
for (var newTopic : newTopics) {
var resolvedTopic = newTopic.replicasAssignments() == null
final var resolvedTopics = new ArrayList<NewTopic>();
for (final var newTopic : newTopics) {
final var resolvedTopic = newTopic.replicasAssignments() == null
? new NewTopic(topicResolver.resolve(newTopic.name()), newTopic.numPartitions(), newTopic.replicationFactor())
: new NewTopic(topicResolver.resolve(newTopic.name()), newTopic.replicasAssignments());
// Make sure that the config is added properly. Cleanup properties and timestamps are typical properties set in Streams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
*/

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.*;

import java.time.Duration;
import java.util.*;
Expand Down Expand Up @@ -181,6 +178,11 @@ public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> part
return delegate.committed(partitions, timeout);
}

@Override
public Uuid clientInstanceId(Duration duration) {
return delegate.clientInstanceId(duration);
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return delegate.metrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* ========================LICENSE_START=================================
* Extended Kafka clients for KSML
* %%
* Copyright (C) 2021 - 2023 Axual B.V.
* Copyright (C) 2021 - 2024 Axual B.V.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,10 +26,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.*;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.time.Duration;
Expand Down Expand Up @@ -106,6 +103,11 @@ public List<PartitionInfo> partitionsFor(String topic) {
return delegate.metrics();
}

@Override
public Uuid clientInstanceId(Duration duration) {
return delegate.clientInstanceId(duration);
}

@Override
public void close() {
delegate.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ private static RecordMetadata convertRecordMetadata(RecordMetadata input, String
input.offset(),
0,
input.timestamp(),
null,
input.serializedKeySize(),
input.serializedValueSize());
}

@Deprecated
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
Map<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<>();
Expand Down
6 changes: 3 additions & 3 deletions ksml/NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Lists of 72 third-party dependencies.
(The Apache Software License, Version 2.0) Jackson-dataformat-XML (com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.17.1 - https://github.com/FasterXML/jackson-dataformat-xml)
(The Apache Software License, Version 2.0) Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.17.1 - https://github.com/FasterXML/jackson-dataformats-text)
(The Apache License, Version 2.0) Woodstox (com.fasterxml.woodstox:woodstox-core:6.6.2 - https://github.com/FasterXML/woodstox)
(BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni)
(BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.6-3 - https://github.com/luben/zstd-jni)
(The Apache Software License, Version 2.0) FindBugs-jsr305 (com.google.code.findbugs:jsr305:3.0.2 - http://findbugs.sourceforge.net/)
(Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.26.1 - https://errorprone.info/error_prone_annotations)
(The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.2 - https://github.com/google/guava/failureaccess)
Expand Down Expand Up @@ -39,8 +39,8 @@ Lists of 72 third-party dependencies.
(Apache-2.0) Apache Avro (org.apache.avro:avro:1.11.3 - https://avro.apache.org)
(Apache-2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.26.2 - https://commons.apache.org/proper/commons-compress/)
(Apache-2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.14.0 - https://commons.apache.org/proper/commons-lang/)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.6.2 - https://kafka.apache.org)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.6.2 - https://kafka.apache.org)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.8.0 - https://kafka.apache.org)
(The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.8.0 - https://kafka.apache.org)
(Bouncy Castle Licence) Bouncy Castle PKIX, CMS, EAC, TSP, PKCS, OCSP, CMP, and CRMF APIs (org.bouncycastle:bcpkix-jdk18on:1.76 - https://www.bouncycastle.org/java.html)
(Bouncy Castle Licence) Bouncy Castle Provider (org.bouncycastle:bcprov-jdk18on:1.76 - https://www.bouncycastle.org/java.html)
(Bouncy Castle Licence) Bouncy Castle ASN.1 Extension and Utility APIs (org.bouncycastle:bcutil-jdk18on:1.76 - https://www.bouncycastle.org/java.html)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<apache.avro.version>1.11.3</apache.avro.version>
<apache.commons.text.version>1.12.0</apache.commons.text.version>
<apache.commons.compress.version>1.26.2</apache.commons.compress.version>
<apache.kafka.version>3.6.2</apache.kafka.version>
<apache.kafka.version>3.8.0</apache.kafka.version>
<confluent.version>7.6.1</confluent.version>
<graalvm.version>23.1.2</graalvm.version>
<graalvm.polyglot.version>23.1.2</graalvm.polyglot.version>
Expand Down

0 comments on commit deb39b3

Please sign in to comment.