Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12895: Drop support for Scala 2.12 in Kafka 4.0 #17313

Merged
merged 16 commits into from
Oct 6, 2024

Conversation

frankvicky
Copy link
Contributor

@frankvicky frankvicky commented Sep 29, 2024

JIRA: KAFKA-12895

We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it in Apache Kafka 4.0.

Implement KIP-751

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added core Kafka Broker build Gradle build or GitHub Actions labels Sep 29, 2024
@frankvicky frankvicky marked this pull request as draft September 29, 2024 09:09
@frankvicky frankvicky marked this pull request as ready for review September 29, 2024 11:33
@frankvicky
Copy link
Contributor Author

Hi @jolshan,
You mentioned earlier that you were interested in this ticket. Could you please take a look?
Thank you ! 😺

@m1a2st
Copy link
Contributor

m1a2st commented Sep 29, 2024

Thanks for your PR, left some comments, PTAL

@m1a2st
Copy link
Contributor

m1a2st commented Sep 29, 2024

Please also update the LICENSE-binary, there are some scala 2.12.19 version, and should also update the gradlewAll file.
Some html document also need to update.

@frankvicky
Copy link
Contributor Author

Currently there are some html metiond about Scala 2.12 are located in Streams module. I'm not sure they should be change in this PR or not.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky Could you please add this change to upgrade.html?

// a higher minimum Java requirement than Kafka. This was previously the case for Scala 2.12 and Java 7.
availableScalaVersions = [ '2.12', '2.13' ]
// a higher minimum Java requirement than Kafka.
availableScalaVersions = [ '2.13' ]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I will remove it

Copy link
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the classes where we're actually change code (LogCleanerManager and LogCleaner), can you double check that we have unit test code coverage for these methods? I just want to be sure we're not changing the logic.

@@ -121,7 +120,7 @@ class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
}

def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = {
val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, {
val queue =markersPerTxnTopicPartition.getOrElseUpdate(txnTopicPartition, {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, fixed it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the classes where we're actually change code (LogCleanerManager and LogCleaner), can you double check that we have unit test code coverage for these methods? I just want to be sure we're not changing the logic.

Regarding to comment from @chia7712 , I have reverted the changes in LogCleanerManager and LogCleaner

@mumrah mumrah requested a review from ijuma September 29, 2024 22:54
@mumrah mumrah added the dependencies Pull requests that update a dependency file label Sep 29, 2024
@jolshan
Copy link
Member

jolshan commented Sep 30, 2024

Scanned through some of the files here: https://github.com/search?q=repo%3Aapache%2Fkafka%202.12&type=code

I noticed the vagrant file also has 2.12 references. Is this just for building old versions?

# tests with Java 7. We have since switched to Java 8, so 2.0.0 and later use Scala 2.12.

What is the concern about the streams changes?

@chia7712
Copy link
Member

I noticed the vagrant file also has 2.12 references. Is this just for building old versions?

yes, the dockerfile (e2e) also has 2.12 references.

RUN mkdir -p "/opt/kafka-3.3.2" && chmod a+rw /opt/kafka-3.3.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.3.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.3.2"

// in the case where there's only a single Arguments in the list. The following commented-out
// method works in Scala 2.13, but not 2.12. For this reason, tests which run against just a
// single combination are written using @CsvSource rather than the more elegant @MethodSource.
// def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be used by testAssignAndConsumeWithLeaderChangeValidatingPositions. Since we are phasing out ZooKeeper, it's fine to remove it for now.

def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String, groupProtocol: String): Unit = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have uncommented the getTestQuorumAndGroupProtocolParametersZkOnly method and replaced @CsvSource with @MethodSource.

@chia7712
Copy link
Member

For the classes where we're actually change code (LogCleanerManager and LogCleaner), can you double check that we have unit test code coverage for these methods? I just want to be sure we're not changing the logic.

This PR should primarily focus on removing Scala 2.12. The changes made to LogCleanerManager and LogCleaner seem more like cosmetic modifications. It might be better to revert them to their original state.

@@ -17,5 +17,5 @@

# Convenient way to invoke a gradle command with all Scala versions supported
# by default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a comment here that this is no longer needed, but we are keeping it for backwards compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should print deprecated information to users to remind that this gradlewAll is unnecessary and will be removed later.

Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, I left a few comments.

README.md Outdated
@@ -11,8 +11,7 @@ the broker and tools has been deprecated since Apache Kafka 3.7 and removal of b
see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and
[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details).

Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since
Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
Scala 2.13 is used by default. Scala 2.12 support has been removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should simply say that Scala 2.13 is the only version we support. The readme is not a changelog. We have a separate file for the changelog.

README.md Outdated
@@ -114,14 +113,14 @@ Using compiled files:
### Cleaning the build ###
./gradlew clean

### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ###
### Running a task with Scala 2.13.x ###
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just delete this section and all the other sections related to multiple Scala versions, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve kept the section related to ./gradlewAll since we haven’t removed it yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove that too. We can simply add a comment to gradlewAll explaining that it has been kept for compatibility reasons, but there is no reason to use it anymore.

* In Scala 2.12, `ConcurrentMap.getOrElse` has the same behaviour as this method, but JConcurrentMapWrapper that
* wraps Java maps does not.
*/
def atomicGetOrUpdate[K, V](map: concurrent.Map[K, V], key: K, createValue: => V): V = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we double check that this comment is accurate for the various concurrent.Map implementations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the implementation in the Scala source code.

We could remove atomicGetOrUpdate() in Scala 2.13, as Scala now provides a very similar implementation for getOrElseUpdate().

In Scala 2.12, if you convert a ConcurrentHashMap using scala.collection.JavaConverters.asScala, you will indeed get a less reliable version of getOrElseUpdate that does not handle race conditions or atomic operations properly. This is the less robust version, as it doesn't ensure atomic updates.

2.13

  /**
    * Replaces the entry for the given key only if it was previously mapped
    * to some value.
    *
    * $atomicop
    *
    * @param k   key for which the entry should be replaced
    * @param v   value to be associated with the specified key
    * @return    `Some(v)` if the given key was previously mapped to some value `v`, or `None` otherwise
    */
  def replace(k: K, v: V): Option[V]

  override def getOrElseUpdate(key: K, @deprecatedName("op", since="2.13.13") defaultValue: => V): V = get(key) match {
    case Some(v) => v
    case None =>
      val v = defaultValue
      putIfAbsent(key, v) match {
        case Some(ov) => ov
        case None => v
      }
  }

2.12

  /** If given key is already in this map, returns associated value.
   *
   *  Otherwise, computes value from given expression `op`, stores with key
   *  in map and returns that value.
   *
   *  Concurrent map implementations may evaluate the expression `op`
   *  multiple times, or may evaluate `op` without inserting the result.
   *  
   *  @param  key the key to test
   *  @param  op  the computation yielding the value to associate with `key`, if
   *              `key` is previously unbound.
   *  @return     the value associated with key (either previously or as a result
   *              of executing the method).
   */
  def getOrElseUpdate(key: K, op: => V): V =
    get(key) match {
      case Some(v) => v
      case None => val d = op; this(key) = d; d
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky we should check the JavaCollectionWrappers for 2.13

https://github.com/scala/scala/blob/2.13.x/src/library/scala/collection/convert/JavaCollectionWrappers.scala#L482

    override def getOrElseUpdate(key: K, op: => V): V =
      underlying.computeIfAbsent(key, _ => op) match {
        case null => super/*[concurrent.Map]*/.getOrElseUpdate(key, op)
        case v    => v
      }

Hence, the wrapper actually leverage computeIfAbsent for normal case. The getOrElseUpdate is used to put the null to map since computeIfAbsent does not. That is for following getOrElseUpdate semantics

The getOrElseUpdate of current scala 2.13 is good, and we can do a bit cleanup (in the follow-up) as current usages (markersQueuePerBroker and markersPerTxnTopicPartition) are not necessary to use scala wrapper ..

Copy link
Member

@ijuma ijuma Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java implementation of computeIfAbsent acquires locks, so the behavior is different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java implementation of computeIfAbsent acquires locks, so the behavior is different.

Pardon me, either computeIfAbsent or putIfAbsent acquires lock (ConcurrentHashMap impl) when it needs to mutable the inner table. Additionally, the main behavior difference is that atomicGetOrUpdate has a side effect that it may create unnecessary value in race condition, but getOrElseUpdate (computeIfAbsent) does not. For another, the main concern in this use case is the optimized read (according to comment). Maybe we can run JMH to check the perf of read-heavy case between computeIfAbsent and get.

@frankvicky Could you please add a test case to ConcurrentMapBenchmark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test in macOSX

Benchmark                                                            (mapSize)  (writePercentage)  Mode  Cnt  Score   Error  Units
ConcurrentMapBenchmark.testConcurrentHashMapComputeIfAbsentReadOnly        100                0.1  avgt    5  4.706 ± 0.074  ns/op
ConcurrentMapBenchmark.testConcurrentHashMapGetReadOnly                    100                0.1  avgt    5  3.557 ± 0.032  ns/op
JMH benchmarks done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test in Ubuntu

Benchmark                                                            (mapSize)  (writePercentage)  Mode  Cnt  Score   Error  Units
ConcurrentMapBenchmark.testConcurrentHashMapComputeIfAbsentReadOnly        100                0.1  avgt    5  4.099 ± 0.080  ns/op
ConcurrentMapBenchmark.testConcurrentHashMapGetReadOnly                    100                0.1  avgt    5  3.439 ± 0.084  ns/op
JMH benchmarks done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Right, one implementation always acquires a lock to ensure the value is only computed once while the other only acquires locks if it needs to mutate the underlying map. The latter is better for read-heavy while the former is better for write-heavy cases. I think it would be best to have changes that may make things worse submitted separately. This PR should only have the changes that are neutral or always better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to have changes that may make things worse submitted separately. This PR should only have the changes that are neutral or always better.

totally agree. open https://issues.apache.org/jira/browse/KAFKA-17677 as follow-up. @frankvicky could you please keep atomicGetOrUpdate?

@chia7712
Copy link
Member

These are not just cosmetic, they're fixing unnecessary perf overhead as well. I don't think we should have any code remaining that has a "Scala 2.12" comment. So, the change was fine, in my opinion. If we want to do it via a separate PR, let's make sure we have a JIRA and we don't consider the work done until this is also completed.

It seems to me that both "cosmetic" changes and "unnecessary performance overhead" should be handled in a follow-up to avoid prolonged discussions. I believe there are several improvements, such as KAFKA-14615, that can be addressed after dropping Scala 2.12.

@chia7712
Copy link
Member

If we want to do it via a separate PR, let's make sure we have a JIRA and we don't consider the work done until this is also completed.

https://issues.apache.org/jira/browse/KAFKA-17667

@ijuma
Copy link
Member

ijuma commented Sep 30, 2024

My point is a bit different. It's one thing to have optimizations, but it's another to have leftover code like:

// Note: we don't use retain or filterInPlace method in this function because retain is deprecated in
// scala 2.13 while filterInPlace is not available in scala 2.12.

Doing what it takes to clean up such comments should be part of removing 2.12 support.

@chia7712
Copy link
Member

Doing what it takes to clean up such comments should be part of removing 2.12 support.

I agree with cleaning up the comments and straightforwardly rewriting the code using Scala 2.13 APIs. However, any tasks requiring further discussion or additional testing can be deferred to a follow-up. This approach helps prevent infinite conflicts in this large patch (with 54 files changed)

@frankvicky frankvicky closed this Oct 3, 2024
@frankvicky frankvicky deleted the KAFKA-12895 branch October 3, 2024 06:05
@frankvicky frankvicky restored the KAFKA-12895 branch October 3, 2024 06:07
@frankvicky frankvicky reopened this Oct 3, 2024
@frankvicky
Copy link
Contributor Author

I accidentally deleted the branch due to an operational mistake, but I’ve restored it now.
Apologies for the confusion. 🙇🏼

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -26,6 +26,10 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
<ul>
<li><b>Common</b>
<ul>
<li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move this to the 'Other Changes' section? This section seems to be focused on configuration changes.

@@ -17,10 +17,8 @@

package kafka.utils.json

import scala.collection.{Map, Seq}
import scala.collection.compat._
import scala.collection.{Factory, Map, Seq}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this unrelated changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting rid of scala.collection.compat._ since it was a workaround package for cross-building with Scala 2.12. Now, we can import Factory directly from scala.collection.

@mumrah mumrah requested a review from ijuma October 4, 2024 12:38
@@ -1013,7 +1013,7 @@ public void shouldAlterTopicConfig(boolean file) {
addedConfigs.put("delete.retention.ms", "1000000");
addedConfigs.put("min.insync.replicas", "2");
if (file) {
File f = kafka.utils.TestUtils.tempPropertiesFile(JavaConverters.mapAsScalaMap(addedConfigs));
File f = kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.MapHasAsScala(addedConfigs).asScala());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please import scala.jdk.javaapi.CollectionConverters instead then rewrite this to CollectionConverters.asScala(addedConfigs)

@@ -2294,6 +2294,6 @@ public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteratio

@SuppressWarnings({"deprecation"})
private <T> Seq<T> seq(Collection<T> seq) {
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
return IteratorHasAsScala(seq.iterator()).asScala().toSeq();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return CollectionConverters.asScala(seq).toSeq();

@@ -2294,6 +2294,6 @@ public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteratio

@SuppressWarnings({"deprecation"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove unnecessary annotation

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I'm going to merge this PR. Please feel free to leave any comments and follow KAFKA-17667 for further optimizations and cleanup.

@chia7712 chia7712 merged commit 0e4eebe into apache:trunk Oct 6, 2024
6 checks passed
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions core Kafka Broker dependencies Pull requests that update a dependency file performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants