Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Proper test virtual thread dispatcher and update doc. #1728

Merged
merged 1 commit into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object VirtualThreadPoolDispatcherSpec {

override def receive = {
case "ping" =>
sender() ! "All fine"
sender() ! Thread.currentThread().getName
}
}

Expand All @@ -43,14 +43,16 @@ object VirtualThreadPoolDispatcherSpec {
class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
import VirtualThreadPoolDispatcherSpec._

val Iterations = 1000

"VirtualThreadPool support" must {

"handle simple dispatch" in {
val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher"))
innocentActor ! "ping"
expectMsg("All fine")
for (_ <- 1 to 1000) {
innocentActor ! "ping"
expectMsgPF() { case name: String =>
name should include("VirtualThreadPoolDispatcherSpec-virtual-thread-dispatcher-virtual-thread-")
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis
val tf: ThreadFactory = threadFactory match {
case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) =>
new ThreadFactory {
private val vtFactory = newVirtualThreadFactory(name)
private val vtFactory = newVirtualThreadFactory(name + "-" + id)

override def newThread(r: Runnable): Thread = {
val vt = vtFactory.newThread(r)
Expand All @@ -426,7 +426,7 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis
vt
}
}
case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name);
case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name + "-" + id);
}
new ExecutorServiceFactory {
import VirtualThreadSupport._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private[dispatch] object VirtualThreadSupport {
newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService]
} catch {
case NonFatal(e) =>
// --add-opens java.base/java.lang=ALL-UNNAMED
throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e)
}
}
Expand Down
2 changes: 2 additions & 0 deletions docs/src/main/paradox/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu
threads the pool keep running in order to reduce the latency of handling a new incoming task.
You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html).

When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool.

@@@

Another example that uses the "thread-pool-executor":
Expand Down
2 changes: 2 additions & 0 deletions docs/src/main/paradox/typed/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu
threads the pool will keep running in order to reduce the latency of handling a new incoming task.
You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html).

When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool.

@@@

@@@ note
Expand Down
1 change: 1 addition & 0 deletions project/PekkoBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ object PekkoBuild {
UsefulTask("testQuick",
"Runs all the tests. When run multiple times will only run previously failing tests (shell mode only)"),
UsefulTask("testOnly *.AnySpec", "Only run a selected test"),
UsefulTask("TestJdk9 / testOnly *.AnySpec", "Only run a Jdk9+ selected test"),
UsefulTask("testQuick *.AnySpec",
"Only run a selected test. When run multiple times will only run previously failing tests (shell mode only)"),
UsefulTask("testQuickUntilPassed", "Runs all tests in a continuous loop until all tests pass"),
Expand Down
Loading