Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
YarShev committed Apr 22, 2024
1 parent c3dc911 commit 3b9e665
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 21 deletions.
6 changes: 4 additions & 2 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
ProgressBar,
RangePartitioning,
RangePartitioningGroupby,
RayCustomResources,
RayInitCustomResources,
RayTaskCustomResources,
RayRedisAddress,
RayRedisPassword,
ReadSqlEngine,
Expand Down Expand Up @@ -76,7 +77,8 @@
"IsRayCluster",
"RayRedisAddress",
"RayRedisPassword",
"RayCustomResources",
"RayInitCustomResources",
"RayTaskCustomResources",
"LazyExecution",
# Dask specific
"DaskThreadsPerWorker",
Expand Down
35 changes: 33 additions & 2 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,46 @@ class RayRedisPassword(EnvironmentVariable, type=ExactStr):
default = secrets.token_hex(32)


class RayCustomResources(EnvironmentVariable, type=dict):
class RayInitCustomResources(EnvironmentVariable, type=dict):
"""
Ray node's custom resources to initialize with.
Visit Ray documentation for more details:
https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources
Notes
-----
Relying on Modin to initialize Ray, you should set this config
for the proper initialization with custom resources.
"""

varname = "MODIN_RAY_INIT_CUSTOM_RESOURCES"
default = None


class RayTaskCustomResources(EnvironmentVariable, type=dict):
"""
Ray node's custom resources to request them in tasks or actors.
Visit Ray documentation for more details:
https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#custom-resources
Notes
-----
You can use this config to limit the parallelism for the entire workflow
by setting the config at the very beginning.
>>> import modin.config as cfg
>>> cfg.RayTaskCustomResources.put({"special_hardware": 0.001})
This way each single remote task or actor will require 0.001 of "special_hardware" to run.
You can also use this config to limit the parallelism for a certain operation
by setting the config with context.
>>> with context(RayTaskCustomResources={"special_hardware": 1.0}):
... df.<op>
This way each single remote task or actor will require 0.001 of "special_hardware" to run
within the context only.
"""

varname = "MODIN_RAY_CUSTOM_RESOURCES"
varname = "MODIN_RAY_TASK_CUSTOM_RESOURCES"
default = None


Expand Down
8 changes: 4 additions & 4 deletions modin/core/execution/ray/common/deferred_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from ray._private.services import get_node_ip_address
from ray.util.client.common import ClientObjectRef

from modin.config import RayCustomResources
from modin.config import RayTaskCustomResources
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.logging import get_logger

Expand Down Expand Up @@ -157,7 +157,7 @@ def exec(
and self.num_returns == 1
):
result, length, width, ip = remote_exec_func.options(
resources=RayCustomResources.get()
resources=RayTaskCustomResources.get()
).remote(self.func, self.data, *self.args, **self.kwargs)
meta = MetaList([length, width, ip])
self._set_result(result, meta, 0)
Expand Down Expand Up @@ -437,11 +437,11 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]:
# does not require the num_returns to be specified in options.
if num_returns == 2:
return _remote_exec_single_chain.options(
resources=RayCustomResources.get()
resources=RayTaskCustomResources.get()
).remote(*args)
else:
return _remote_exec_multi_chain.options(
num_returns=num_returns, resources=RayCustomResources.get()
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(num_returns, *args)

def _set_result(
Expand Down
4 changes: 2 additions & 2 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import ray
from ray.util.client.common import ClientObjectRef

from modin.config import RayCustomResources
from modin.config import RayTaskCustomResources
from modin.error_message import ErrorMessage


Expand Down Expand Up @@ -80,7 +80,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_ray_func.options(
num_returns=num_returns, resources=RayCustomResources.get()
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(func, *args, **kwargs)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
IsRayCluster,
Memory,
NPartitions,
RayCustomResources,
RayInitCustomResources,
RayRedisAddress,
RayRedisPassword,
StorageFormat,
Expand Down Expand Up @@ -127,7 +127,7 @@ def initialize_ray(
"object_store_memory": object_store_memory,
"_redis_password": redis_password,
"_memory": object_store_memory,
"resources": RayCustomResources.get(),
"resources": RayInitCustomResources.get(),
**extra_init_kw,
}
# It should be enough to simply set the required variables for the main process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs

from modin.config import RayCustomResources
from modin.config import RayTaskCustomResources
from modin.core.execution.ray.common import RayWrapper, SignalActor
from modin.core.execution.ray.generic.io import RayIO
from modin.core.io import (
Expand Down Expand Up @@ -189,7 +189,7 @@ def to_csv(cls, qc, **kwargs):
if not cls._to_csv_check_support(kwargs):
return RayIO.to_csv(qc, **kwargs)

signals = SignalActor.options(resources=RayCustomResources.get()).remote(
signals = SignalActor.options(resources=RayTaskCustomResources.get()).remote(
len(qc._modin_frame._partitions) + 1
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if TYPE_CHECKING:
from ray.util.client.common import ClientObjectRef

from modin.config import LazyExecution, RayCustomResources
from modin.config import LazyExecution, RayTaskCustomResources
from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.ray.common import MaterializationHook, RayWrapper
from modin.core.execution.ray.common.deferred_execution import (
Expand Down Expand Up @@ -271,7 +271,7 @@ def length(self, materialize=True):
self.drain_call_queue()
if (length := self._length_cache) is None:
length, self._width_cache = _get_index_and_columns.options(
resources=RayCustomResources.get()
resources=RayTaskCustomResources.get()
).remote(self._data_ref)
self._length_cache = length
if materialize and isinstance(length, ObjectIDType):
Expand All @@ -298,7 +298,7 @@ def width(self, materialize=True):
self.drain_call_queue()
if (width := self._width_cache) is None:
self._length_cache, width = _get_index_and_columns.options(
resources=RayCustomResources.get()
resources=RayTaskCustomResources.get()
).remote(self._data_ref)
self._width_cache = width
if materialize and isinstance(width, ObjectIDType):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import ray
from ray.util import get_node_ip_address

from modin.config import RayCustomResources
from modin.config import RayTaskCustomResources
from modin.core.dataframe.pandas.partitioning.axis_partition import (
PandasDataframeAxisPartition,
)
Expand Down Expand Up @@ -116,7 +116,7 @@ def deploy_splitting_func(
if extract_metadata
else num_splits
),
resources=RayCustomResources.get(),
resources=RayTaskCustomResources.get(),
).remote(
cls._get_deploy_split_func(),
*f_args,
Expand Down Expand Up @@ -182,7 +182,7 @@ def deploy_axis_func(
num_returns=(num_splits if lengths is None else len(lengths))
* (1 + cls._PARTITIONS_METADATA_LEN),
**({"max_retries": max_retries} if max_retries is not None else {}),
resources=RayCustomResources.get(),
resources=RayTaskCustomResources.get(),
).remote(
cls._get_deploy_axis_func(),
*f_args,
Expand Down Expand Up @@ -244,7 +244,7 @@ def deploy_func_between_two_axis_partitions(
"""
return _deploy_ray_func.options(
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN),
resources=RayCustomResources.get(),
resources=RayTaskCustomResources.get(),
).remote(
PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions,
*f_args,
Expand Down

0 comments on commit 3b9e665

Please sign in to comment.