diff --git a/modin/config/__init__.py b/modin/config/__init__.py index 26d759324fd..5a2aa1a68cb 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -21,6 +21,7 @@ CIAWSAccessKeyID, CIAWSSecretAccessKey, CpuCount, + DaskThreadsPerWorker, DoUseCalcite, Engine, EnvironmentVariable, @@ -73,6 +74,8 @@ "RayRedisPassword", "TestRayClient", "LazyExecution", + # Dask specific + "DaskThreadsPerWorker", # Partitioning "NPartitions", "MinPartitionSize", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index c865e07a358..6ae033d9ba2 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -830,6 +830,13 @@ class LazyExecution(EnvironmentVariable, type=bool): default = False +class DaskThreadsPerWorker(EnvironmentVariable, type=int): + """Number of threads per Dask worker.""" + + varname = "MODIN_DASK_THREADS_PER_WORKER" + default = 1 + + def _check_vars() -> None: """ Check validity of environment variables. diff --git a/modin/core/execution/dask/common/utils.py b/modin/core/execution/dask/common/utils.py index 3eda2a50375..a2909e2d320 100644 --- a/modin/core/execution/dask/common/utils.py +++ b/modin/core/execution/dask/common/utils.py @@ -22,6 +22,7 @@ GithubCI, Memory, NPartitions, + DaskThreadsPerWorker, ) from modin.core.execution.utils import set_env from modin.error_message import ErrorMessage @@ -54,12 +55,17 @@ def _disable_warnings(): """, ) num_cpus = CpuCount.get() + threads_per_worker = DaskThreadsPerWorker.get() memory_limit = Memory.get() worker_memory_limit = memory_limit // num_cpus if memory_limit else "auto" # when the client is initialized, environment variables are inherited with set_env(PYTHONWARNINGS="ignore::FutureWarning"): - client = Client(n_workers=num_cpus, memory_limit=worker_memory_limit) + client = Client( + n_workers=num_cpus, + threads_per_worker=threads_per_worker, + memory_limit=worker_memory_limit, + ) if GithubCI.get(): # set these keys to run tests that write to the mock s3 service. this seems