Skip to content

A future created inside of a parallel collection map / foreach sometimes blocks the fork join pool with JDK 11 #12106

Open
@OndrejSpanel

Description

@OndrejSpanel
Member

reproduction steps

(using Scala 2.12.12 or 2.13.3)

problem

Observe the message "Suspicious background duration ..." being printed multiple times. This messages shows that a problem with a parallel foreach scheduling has been detected, namely that a long-duration future created from the parallel foreach has blocked the foreach executtion. This happens both with Scala 2.12.12 and Scala 2.13.3 running on JDK 11.0.4. It does not happen when running on JDK / JRE 8. It seems to happen intermittently, probably depending on exact timing of the Future call relative the to parallel collection tasks execution.

The foreach should normally terminate in much less than 200 ms, as 50 tasks are executed, each of them taking 2 ms. Once it takes more than 200 ms, it shows there is a scheduling problem with the slow future blocking the parallel collection tasks.

Following JFR screenshot shows the thread scheduling once the issue is seen (you can also see normals iterations without the issue):

image

My configuration is Windows 10 x64 build 2004, Intel i7 / 4x core.

The code demonstrating the issue is https://github.com/OndrejSpanel/ParLoopAndFuture/blob/c2f4d1b6b155217d8f8111f58e6a7c9337f36619/src/main/scala/com/github/ondrejspanel/parFuture/Main.scala#L1-L45

package com.github.ondrejspanel.parFuture

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main {


  def hotSleep(ms: Int): Unit = {
    val now = System.currentTimeMillis()
    while (System.currentTimeMillis() < now + ms) {}

  }

  def main(args: Array[String]): Unit = {
    val stateCount = 50
    val state = (0 until stateCount).par

    val triggerIssue = true

    (0 until 1000).foreach { i =>
      hotSleep(25)
      val innerScopeBeg = System.currentTimeMillis()
      if (!triggerIssue) {
        Future {
          hotSleep(205)
        }
      }
      state.foreach { x =>
        if (triggerIssue && x == 0) {
          Future {
            hotSleep(205)
          }
        }
        hotSleep(2)
      }
      val innerScopeEnd = System.currentTimeMillis()
      val duration = innerScopeEnd - innerScopeBeg
      if (duration >= 200) {
        println(s"Suspicious background duration $duration")
      }
    }
  }

}

Activity

SethTisue

SethTisue commented on Aug 4, 2020

@SethTisue
Member

does using blocking help? see #12089

cc @viktorklang

viktorklang

viktorklang commented on Aug 4, 2020

@viktorklang

@SethTisue blocking could help, but ultimately, if it manifests on 2.12 it is likely a FJP thing.

OndrejSpanel

OndrejSpanel commented on Aug 4, 2020

@OndrejSpanel
MemberAuthor

@SethTisue Where should I use blocking? There is no blocking or sleeping anywhere in the code. The hotSleep function is just used to simulate a CPU intensive code, no real sleeping there.

OndrejSpanel

OndrejSpanel commented on Aug 4, 2020

@OndrejSpanel
MemberAuthor

it is likely a FJP thing

I think this is very likely, as the thread which is waiting is inside of the java.util.concurrent.ForkJoinTask#internalWait function.

viktorklang

viktorklang commented on Aug 4, 2020

@viktorklang

@OndrejSpanel blocking means "blocking progress for other tasks"—whish seems to be what hotSleep does.

OndrejSpanel

OndrejSpanel commented on Aug 5, 2020

@OndrejSpanel
MemberAuthor

I have tested it and it does not help, at least when used as this:

            Future {
              blocking {
                hotSleep(205)
              }
            }
OndrejSpanel

OndrejSpanel commented on Aug 5, 2020

@OndrejSpanel
MemberAuthor

blocking means "blocking progress for other tasks"

@viktorklang I find such definition unusual. Compare with https://en.wikipedia.org/wiki/Blocking_(computing):

A process that is blocked is one that is waiting for some event, such as a resource becoming available or the completion of an I/O operation.

viktorklang

viktorklang commented on Aug 5, 2020

@viktorklang

@OndrejSpanel Remember that when you execute on a thread pool you have cooperative scheduling for the tasks, not preemptive, so in this case, no matter if you are busy-spinning or sleeping you are preventing progress for other tasks.

OndrejSpanel

OndrejSpanel commented on Aug 5, 2020

@OndrejSpanel
MemberAuthor

"busy-spinning" is just a simple replacement of a CPU intensive computation found in the real application where I see the issue. I could use a recursive Fibonacci computation instead, or something like that, but hotSleep is much eaiser to implement and understand.

When I execute one long task on a pool of 8 threads, I do not expect it to be blocking the whole process. The same code works on Java 8 without any issues and I do not see any reason why it should not.

There is some problem with Java 11 scheduling (most likely caused by some FJP implementation changes), which perhaps causes some short task to be scheduled on the thread which is busy with the long task, or something like that. Spawning more threads (which is the result of using blocking) would not help.

By posting the issue here I expect to achieve two things:

  • the issue to be acknowledged and documented
  • some reasonable workaround to be found

Fixing the issue either in Scala or in Java would be nice, but I do not expect for this to happen soon enough to help me in any way.

lrytz

lrytz commented on Aug 12, 2020

@lrytz
Member

Putting the blocking discussion aside, it would be good to keep investigating.

Since the same thread pool is used, maybe the surface area of the reproduction can be reduced? Avoid using parallel collections, or maybe even write a java-only reproducer? @OndrejSpanel do you have time to give this a try? Or even in the current state, it might be worth reporting it to Java concurrency experts. http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest for example be active.

For the record, I can reproduce the issue as described in the ticket on my macOS.

OndrejSpanel

OndrejSpanel commented on Aug 12, 2020

@OndrejSpanel
MemberAuthor

Avoid using parallel collections

I did not see the issue when not using parallel collections. It is possible it could be reproduced by expanding the parallel collections to the plain Java executor calls, I am not sure how much work this would be.

@OndrejSpanel do you have time to give this a try?

I am not sure I want to dedicate much time to this. Given the issue shows with Java 11 which is available to general public, I guess a priority for me will be finding a suitable workaround, as I cannot expect the users of my application will avoid this particular runtime.

lrytz

lrytz commented on Aug 12, 2020

@lrytz
Member

Understood, thanks.

I also looked at it in JMC. Here's the stack trace of the waiting executor thread:

image

void java.lang.Object.wait(long)	1	100 %
void java.util.concurrent.ForkJoinTask.internalWait(long)	1	100 %
int java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool$WorkQueue, ForkJoinTask, long)	1	100 %
int java.util.concurrent.ForkJoinTask.doJoin()	1	100 %
Object java.util.concurrent.ForkJoinTask.join()	1	100 %
void scala.collection.parallel.ForkJoinTasks$WrappedTask.sync()	1	100 %
void scala.collection.parallel.ForkJoinTasks$WrappedTask.sync$(ForkJoinTasks$WrappedTask)	1	100 %
void scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync()	1	100 %
void scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal()	1	100 %
...

So the wait starts here: https://github.com/scala/scala/blob/v2.12.12/src/library/scala/collection/parallel/Tasks.scala#L174

It seems that the last subtask within the parallel collections task happens to be the spawned future, if I understand correctly?

I'll try to dig a little further in the next few days.

OndrejSpanel

OndrejSpanel commented on Aug 12, 2020

@OndrejSpanel
MemberAuthor

the last subtask within the parallel collections task happens to be the spawned future

I am not sure I understand you correctly. If you check the source code of the repro, you will see the first iteration of the parallel collection spawns a future, but this does not mean the future becomes the subtask of the collection - the subtasks finishes normally without any further interaction with the spawned future. I repeat the corresponding source lines:

      state.foreach { x =>
        if (triggerIssue && x == 0) {
          Future {
            hotSleep(205)
          }
        }
        hotSleep(2)
      }
lrytz

lrytz commented on Aug 13, 2020

@lrytz
Member

Yes, the future task should be independent of the parallel collections task, but what seems to happen is that the parallel collections task waits for the future to complete at the code location I pointed to. I'll try to figure out if that's really it, and why.

14 remaining items

som-snytt

som-snytt commented on Dec 13, 2023

@som-snytt

So it's just trying to be helpful by assuming you used the wrong API for a subtask. This is like the optimization I mentioned for Future. I never quite understood how Future was intended to interact meaningfully with the API, so at least forking under the hood was a gesture.

OndrejSpanel

OndrejSpanel commented on Dec 15, 2023

@OndrejSpanel
MemberAuthor

@som-snytt I have re-read your last comment several times and I am afraid it structure is too complex for me to understand what it means. Can you rewrite it in simpler words, please?

a priority for me will be finding a suitable workaround

Back then I have implemented a hacky workaround of an execution context which is not starting and futures while parallel loop is running. Now I am revisiting this and checking if perhaps I would be able to implement a custom Execution Context and Task Support to have a simpler and more robust solution.

I have tried using Executors.newFixedThreadPool, and while it did not show the issue, the general performance was real bad, there was a lot of thread parking between the individual tasks.

As another basic experiment I have tried what @lihaoyi wrote in scala/toolkit#31 (comment) - and voila, it does not show the issue even with Java 11 - 21 and the default Fork Join Pool execution context. See https://github.com/OndrejSpanel/ParLoopAndFuture/tree/use-futures

I guess this perhaps answer the part of the question "why is nobody interested in the issue". It does not show without parallel collections and maybe nobody is using parallel collections for any serious work?

som-snytt

som-snytt commented on Dec 15, 2023

@som-snytt

@OndrejSpanel I was trying to say that Future doesn't let you do fork/join, and its default pool is "async" (event-driven), so it really caters to a certain kind of program.

Future used to have a line of code to say that if you try to run something on the "current" fork/join pool, it will try to fork instead of execute (submit from "outside" client). That is similar to what the pool is doing with the Future in your example. But I haven't looked yet at the internals. Parallel collections still has that behavior.

Thanks for the update, I'll try out your branch.

I don't know the answer to your last question, but the lesson from the issue is clearly not to mix future and par. By "clearly", I mean, "I assume so."

OndrejSpanel

OndrejSpanel commented on Dec 15, 2023

@OndrejSpanel
MemberAuthor

but the lesson from the issue is clearly not to mix future and par

I am afraid this means avoiding par completely - I can hardly imagine writing any async Scala code avoiding futures. I like the par simplicity, but issues like this make me think the sentiment expressed in scala/toolkit#31 (comment) may be substantiated.

Ichoran

Ichoran commented on Dec 15, 2023

@Ichoran

If you write the equivalent code with Java streams, do you have the same problem?

One quick way to get into Java Stream land is to convert any Scala collections to arrays with toArray and then into streams with java.util.Arrays.stream.

Another option to get get quickly into Java-stream-land is by using stepper off of your favorite Scala collection, because a Stepper is a Spliterator (so can be passed to StreamSupport.stream).

I don't really see why this should be different, since Java's streams use FJP too. But if for some reason it does work, it might be a not-excruciating workaround for .par, depending on what operations you want to perform. (Java's Stream API isn't nearly as rich as Scala collections', and also the functionality is kind of fragmented between Stream itself and the various Collector types.)

OndrejSpanel

OndrejSpanel commented on Dec 15, 2023

@OndrejSpanel
MemberAuthor

I think futures use FJP too, as this is what ExecutionContext.global uses, and as I wrote, I am convinced I do not see the issue when using futures only.

At the moment I am experimenting with a simplistic future based replacement of par (I need only foreach, map and flatMap for Iterable and Map) and the results seem promising so far. It is not hit by the issue and by keeping things simple I can get even better throughput for my workloads than with par. The core method is:

    val source = Array.ofDim[Any](coll.size)
    val result = Array.ofDim[Any](coll.size)
    val taskCount = coll.size
    val taskIndex = new AtomicInteger(0)
    val tasksDone = new AtomicInteger(0)
    coll.copyToArray(source)

    @tailrec
    def worker(): Boolean = {
      val index = taskIndex.getAndIncrement()
      if (index < taskCount) {
        result(index) = f(source(index).asInstanceOf[T])
        tasksDone.getAndIncrement()
        worker()
      } else false
    }

    val maxConcurrency = numberOfProcessors - 1 // calling thread is doing work as well
    val numberOfWorkers = taskCount - 1 min maxConcurrency
    var i = 0
    while (i < numberOfWorkers) {
      Future(worker())
      i += 1
    }
    // perform tasks on the main thread while waiting for futures to complete
    worker()
    // busy-wait for the rest of the futures
    while (tasksDone.get() < taskCount) {
      Thread.onSpinWait()
    }
    result.asInstanceOf[Array[S]]

See also https://github.com/OndrejSpanel/ParLoopAndFuture/tree/custom-par

Ichoran

Ichoran commented on Dec 15, 2023

@Ichoran

I use that kind of thing quite often, and it's handy as long as you use it with keen awareness of when and what you're doing, and I think you also need a barrier there to ensure that the array updates are all made before the array is passed off to the caller--honestly, I'd just stuff the arrays into a class which accesses them with synchronized blocks to be sure you don't mess up somewhere. (It's that kind of annoying possible-but-intermittent maybe-only-on-some-JVMs-and-not-others bug that really should discourage one from rolling one's own, though. Or roll pedantically, if you know how to be sufficiently pedantic.)

If you use Java streams, or another battle-tested library, this stuff has been thought through already; getting around one flaw in one of those libraries is good, but you might stumble into others.

OndrejSpanel

OndrejSpanel commented on Dec 15, 2023

@OndrejSpanel
MemberAuthor

Barrier should be done by getAndIncrement, which is done before and after processing each item in the worker. Last thing the function does before returning is .get(), which should be the barrier necessary. I think there IS a barrier missing after
copyToArray - taskIndex should be written so that reading it in a worker acts as a barrier.

As for "battle-tested libraries" I am afraid my trust in them is seriously shattered after this experience. I will give Java stream a try, though - I would prefer something standard before self-made.

Ichoran

Ichoran commented on Dec 16, 2023

@Ichoran

I thought getAndIncrement is CAS-based, and get just pulls the value from memory, which has no guarantees for anything save the value it's updating. Specifically, I think you need a storeFence() before you return result so that anything you put in there is actually there. But fences are hard to get right.

OndrejSpanel

OndrejSpanel commented on Dec 16, 2023

@OndrejSpanel
MemberAuthor

so that anything you put in there is actually there

Let me check:

Atomic variable access function have memory fence guarantees, this is what JavaDoc in JDK 8 sources says about get:

with memory effects as specified by {@link VarHandle#getVolatile}.

Note: this is what sources contain and IDE shows it, but web docs stay surprisingly silent on the topic.

The link says:

with memory semantics of reading as if the variable was declared volatile.

JLS 17.4.4. Synchronization Order

A write to a volatile variable v (§8.3.1.4) synchronizes-with all subsequent reads of v by any thread

If an action x synchronizes-with a following action y, then we also have hb(x, y)
If x and y are actions of the same thread and x comes before y in program order, then hb(x, y)

Where hb is happens-before

som-snytt

som-snytt commented on Dec 16, 2023

@som-snytt

The latest commit has a familiar face
image

The package had the doc on memory effect.
The change was dropping unsafe openjdk/jdk@14d4754

I wonder if they'd accept a PR changing as if the variable was declared volatile to either as if it had been declared volatile or as though it were declared volatile.

Thanks for the interesting thread.

OndrejSpanel

OndrejSpanel commented on Dec 16, 2023

@OndrejSpanel
MemberAuthor

I see now. The information is a bit more hidden than in the JDK 11 sources, but it is there. The relevant docs are referenced in the common header as

See the java.util.concurrent.atomic package specification for description of the properties of atomic variables.

That page says later:

The memory effects for accesses and updates of atomics generally follow the rules for volatiles ...

Newer versions are more verbose in this respect: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/atomic/AtomicInteger.html - it is just that Google is showing JDK 8 docs as a most prominent hit.

Ichoran

Ichoran commented on Dec 17, 2023

@Ichoran

But that's happens-before only for the variable. You need happens-before for the array, which might or might not be a side-effect of how the happens-before for the variable is implemented.

OndrejSpanel

OndrejSpanel commented on Dec 18, 2023

@OndrejSpanel
MemberAuthor

But that's happens-before only for the variable.

The h-b is not defined for variables, rather for actions. It is also defined to be transitive. That should be enough.

Ichoran

Ichoran commented on Dec 18, 2023

@Ichoran

Yes, you're right, looks like that's enough!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @viktorklang@lrytz@SethTisue@som-snytt@Ichoran

        Issue actions

          A future created inside of a parallel collection map / foreach sometimes blocks the fork join pool with JDK 11 · Issue #12106 · scala/bug