Skip to content

Commit ca02481

Browse files
tibicrai-dbueshin
authored andcommitted
[SPARK-52971][PYTHON] Limit idle Python worker queue size
### What changes were proposed in this pull request? Makes the number of idle workers in the `PythonWorkerFactory` pool configurable. ### Why are the changes needed? Without limiting the maximum queue size, the idle worker pool can grow unbounded. Allows better control over number of workers allowed. ### Does this PR introduce _any_ user-facing change? Yes, adds a new optional configuration entry: `spark.python.factory.idleWorkerMaxPoolSize`, from Spark 4.1.0 ### How was this patch tested? This patch adds two new test to verify behavior with and without the worker limit configuration. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51684 from craiuconstantintiberiu/SPARK-52971-idle-pool. Authored-by: Tibi Craiu <[email protected]> Signed-off-by: Takuya Ueshin <[email protected]>
1 parent 3f0c450 commit ca02481

File tree

3 files changed

+85
-9
lines changed

3 files changed

+85
-9
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark._
3434
import org.apache.spark.errors.SparkCoreErrors
3535
import org.apache.spark.internal.Logging
3636
import org.apache.spark.internal.LogKeys._
37-
import org.apache.spark.internal.config.Python.{PYTHON_UNIX_DOMAIN_SOCKET_DIR, PYTHON_UNIX_DOMAIN_SOCKET_ENABLED}
37+
import org.apache.spark.internal.config.Python.PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE
3838
import org.apache.spark.security.SocketAuthHelper
3939
import org.apache.spark.util.{RedirectThread, Utils}
4040

@@ -98,8 +98,9 @@ private[spark] class PythonWorkerFactory(
9898
!Utils.isWindows && useDaemonEnabled
9999
}
100100

101-
private val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
102-
private val isUnixDomainSock = authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_ENABLED)
101+
private val conf = SparkEnv.get.conf
102+
private val authHelper = new SocketAuthHelper(conf)
103+
private val isUnixDomainSock = authHelper.isUnixDomainSock
103104

104105
@GuardedBy("self")
105106
private var daemon: Process = null
@@ -111,7 +112,11 @@ private[spark] class PythonWorkerFactory(
111112
@GuardedBy("self")
112113
private var daemonSockPath: String = _
113114
@GuardedBy("self")
114-
private val idleWorkers = new mutable.Queue[PythonWorker]()
115+
// Visible for testing
116+
private[spark] val idleWorkers = new mutable.Queue[PythonWorker]()
117+
@GuardedBy("self")
118+
private val maxIdleWorkerPoolSize =
119+
conf.get(PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE)
115120
@GuardedBy("self")
116121
private var lastActivityNs = 0L
117122
new MonitorThread().start()
@@ -127,7 +132,7 @@ private[spark] class PythonWorkerFactory(
127132
def create(): (PythonWorker, Option[ProcessHandle]) = {
128133
if (useDaemon) {
129134
self.synchronized {
130-
// Pull from idle workers until we one that is alive, otherwise create a new one.
135+
// Pull from idle workers until we get one that is alive, otherwise create a new one.
131136
while (idleWorkers.nonEmpty) {
132137
val worker = idleWorkers.dequeue()
133138
daemonWorkers.get(worker).foreach { workerHandle =>
@@ -203,8 +208,7 @@ private[spark] class PythonWorkerFactory(
203208
blockingMode: Boolean): (PythonWorker, Option[ProcessHandle]) = {
204209
var serverSocketChannel: ServerSocketChannel = null
205210
lazy val sockPath = new File(
206-
authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_DIR)
207-
.getOrElse(System.getProperty("java.io.tmpdir")),
211+
authHelper.sockDir,
208212
s".${UUID.randomUUID()}.sock")
209213
try {
210214
if (isUnixDomainSock) {
@@ -307,8 +311,7 @@ private[spark] class PythonWorkerFactory(
307311
if (isUnixDomainSock) {
308312
workerEnv.put(
309313
"PYTHON_WORKER_FACTORY_SOCK_DIR",
310-
authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_DIR)
311-
.getOrElse(System.getProperty("java.io.tmpdir")))
314+
authHelper.sockDir)
312315
workerEnv.put("PYTHON_UNIX_DOMAIN_ENABLED", "True")
313316
} else {
314317
workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
@@ -483,6 +486,15 @@ private[spark] class PythonWorkerFactory(
483486
if (useDaemon) {
484487
self.synchronized {
485488
lastActivityNs = System.nanoTime()
489+
if (maxIdleWorkerPoolSize.exists(idleWorkers.size >= _)) {
490+
val oldestWorker = idleWorkers.dequeue()
491+
try {
492+
stopWorker(oldestWorker)
493+
} catch {
494+
case e: Exception =>
495+
logWarning("Failed to stop evicted worker", e)
496+
}
497+
}
486498
idleWorkers.enqueue(worker)
487499
}
488500
} else {

core/src/main/scala/org/apache/spark/internal/config/Python.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,15 @@ private[spark] object Python {
127127
.timeConf(TimeUnit.SECONDS)
128128
.checkValue(_ >= 0, "The interval should be 0 or positive.")
129129
.createWithDefault(0)
130+
131+
val PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE =
132+
ConfigBuilder("spark.python.factory.idleWorkerMaxPoolSize")
133+
.doc("Maximum number of idle Python workers to keep. " +
134+
"If unset, the number is unbounded. " +
135+
"If set to a positive integer N, at most N idle workers are retained; " +
136+
"least-recently used workers are evicted first.")
137+
.version("4.1.0")
138+
.intConf
139+
.checkValue(_ > 0, "If set, the idle worker max size must be > 0.")
140+
.createOptional
130141
}

core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
1919

2020
import java.net.SocketTimeoutException
2121

22+
import scala.collection.mutable
2223
// scalastyle:off executioncontextglobal
2324
import scala.concurrent.ExecutionContext.Implicits.global
2425
// scalastyle:on executioncontextglobal
@@ -56,4 +57,56 @@ class PythonWorkerFactorySuite extends SparkFunSuite with SharedSparkContext {
5657
// Timeout ensures that the test fails in 5 minutes if createSimplerWorker() doesn't return.
5758
ThreadUtils.awaitReady(createFuture, 5.minutes)
5859
}
60+
61+
test("idle worker pool is unbounded when idleWorkerMaxPoolSize is not set") {
62+
sc.conf.remove("spark.python.factory.idleWorkerMaxPoolSize")
63+
64+
val factory = new PythonWorkerFactory("python3", "pyspark.worker", Map.empty, true)
65+
66+
assert(factory.idleWorkers.size === 0)
67+
68+
val mockWorkers: mutable.Queue[PythonWorker] = mutable.Queue.empty
69+
try {
70+
(1 to 3).foreach { _ =>
71+
val mockChannel = java.nio.channels.SocketChannel.open()
72+
mockChannel.configureBlocking(false)
73+
mockWorkers.enqueue(PythonWorker(mockChannel))
74+
}
75+
mockWorkers.foreach(factory.releaseWorker)
76+
assert(factory.idleWorkers.size === 3)
77+
78+
} finally {
79+
mockWorkers.foreach(factory.stopWorker)
80+
}
81+
}
82+
83+
test("idle worker pool is bounded when idleWorkerMaxPoolSize is set") {
84+
sc.conf.set("spark.python.factory.idleWorkerMaxPoolSize", "2")
85+
86+
val factory = new PythonWorkerFactory("python3", "pyspark.worker", Map.empty, true)
87+
88+
assert(factory.idleWorkers.size === 0)
89+
val mockWorkers: mutable.Queue[PythonWorker] = mutable.Queue.empty
90+
try {
91+
(1 to 2).foreach { _ =>
92+
val mockChannel = java.nio.channels.SocketChannel.open()
93+
mockChannel.configureBlocking(false)
94+
mockWorkers.enqueue(PythonWorker(mockChannel))
95+
}
96+
mockWorkers.foreach(factory.releaseWorker)
97+
assert(factory.idleWorkers.size === 2)
98+
99+
100+
val worker3 = {
101+
val mockChannel = java.nio.channels.SocketChannel.open()
102+
mockChannel.configureBlocking(false)
103+
PythonWorker(mockChannel)
104+
}
105+
mockWorkers.enqueue(worker3)
106+
factory.releaseWorker(worker3)
107+
assert(factory.idleWorkers.size === 2)
108+
} finally {
109+
mockWorkers.foreach(factory.stopWorker)
110+
}
111+
}
59112
}

0 commit comments

Comments
 (0)