Skip to content

Conversation

craiuconstantintiberiu
Copy link

What changes were proposed in this pull request?

Makes the number of idle workers in the PythonWorkerFactory pool configurable.

Why are the changes needed?

Without limiting the maximum queue size, the idle worker pool can grow unbounded. Allows better control over number of workers allowed.

Does this PR introduce any user-facing change?

Yes, adds a new optional configuration entry: spark.python.factory.idleWorkerMaxPoolSize, from Spark 4.1.0

How was this patch tested?

This patch adds two new test to verify behavior with and without the worker limit configuration.

Was this patch authored or co-authored using generative AI tooling?

No

Comment on lines 107 to 108
mockWorkers.foreach(_.stop())
worker3.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we surround the above with try block and put these in the final clause to make sure all the workers are cleaned up?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, made this change.

@@ -33,6 +33,12 @@ import org.apache.spark.util.ThreadUtils
// Tests for PythonWorkerFactory.
class PythonWorkerFactorySuite extends SparkFunSuite with SharedSparkContext {

private def getIdleWorkerCount(factory: PythonWorkerFactory): Int = {
val field = factory.getClass.getDeclaredField("idleWorkers")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's ok to make idleWorkers as private[spark]? cc @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but let's comment that this exposed for testing purpose.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, changed to private[spark] and added comment.

Copy link

@benrobby benrobby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change, just a few nits

// Visible for testing
private[spark] val idleWorkers = new mutable.Queue[PythonWorker]()
@GuardedBy("self")
private val idleWorkerPoolSize = authHelper.conf.get(PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we access the spark conf via authHelper?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed everywhere to use conf instead of authHelper.conf when needed.

// Visible for testing
private[spark] val idleWorkers = new mutable.Queue[PythonWorker]()
@GuardedBy("self")
private val idleWorkerPoolSize = authHelper.conf.get(PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you adjust the variable name to reflect that it's the maximum size?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.

@@ -484,6 +487,15 @@ private[spark] class PythonWorkerFactory(
self.synchronized {
lastActivityNs = System.nanoTime()
idleWorkers.enqueue(worker)
if (idleWorkerPoolSize.exists(idleWorkers.size > _)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be >=? Otherwise you'll have idleWorkerPoolSize + 1 worker in the pool

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but should not be a problem at the moment.
If we are at maxIdleWorkerPoolSize workers in the queue and we add 1 more, we will go over the limit and then remove the worker.

Not a problem now but could be an issue in the future if the queue implementation is changed to one that actually enforces this and throws an exception.

Changed.

Copy link

@benrobby benrobby Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I see that the worker was enqueued anyways before you were doing this check, so in either case it will be at most workers

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, pending @benrobby's comments.

Copy link

@benrobby benrobby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -484,6 +487,15 @@ private[spark] class PythonWorkerFactory(
self.synchronized {
lastActivityNs = System.nanoTime()
idleWorkers.enqueue(worker)
if (idleWorkerPoolSize.exists(idleWorkers.size > _)) {
Copy link

@benrobby benrobby Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I see that the worker was enqueued anyways before you were doing this check, so in either case it will be at most workers

@ueshin
Copy link
Member

ueshin commented Aug 5, 2025

Thanks! merging to master.

@ueshin ueshin closed this in ca02481 Aug 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants