Skip to content

Commit c72e8e3

Browse files
authored
feat: add engine/memory_format os env variables and delay engine initialization (#2285)
* feat: add engine/memory_format os env variables and delay engine initialization
1 parent ca3d50b commit c72e8e3

File tree

7 files changed

+77
-18
lines changed

7 files changed

+77
-18
lines changed
+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# 9. Engine selection and lazy initialization
2+
3+
Date: 2023-05-17
4+
5+
## Status
6+
7+
Accepted
8+
9+
## Context
10+
11+
In distributed mode, three approaches are possible when it comes to selecting and initializing a Ray engine:
12+
1. Initialize the Ray runtime at import (current default). This option causes the least friction to the user but assumes that installing Ray as an optional dependency is enough to enable distributed mode. Moreover, the user cannot prevent/delay Ray initialization (as it's done at import)
13+
2. Initialize the Ray runtime on the first distributed API call. The user can prevent Ray initialization by switching the engine/memory format with environment variables or between import and the first awswrangler distributed API call. However, by default this approach still assumes that installing Ray is equivalent to enabling distributed mode
14+
3. Wait for the user to enable distributed mode, via environment variables and/or via `wr.engine.set`. This option makes no assumption on which mode to use (distributed vs non-distributed). Non-distributed would be the default and it's up to the user to switch the engine/memory format
15+
16+
## Decision
17+
18+
Option #1 is inflexible and gives little control to the user, while option #3 introduces too much friction and puts the burden on the user. Option #2 on the other hand gives full flexibility to the user while providing a sane default.
19+
20+
## Consequences
21+
22+
The only difference between the current default and the suggested approach is to delay engine initialization, which is not a breaking change. However, it means that in certain situations more than one Ray instance is initialized. For instance, when running tests across multiple threads, each thread runs its own Ray runtime.

awswrangler/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from awswrangler._config import config # noqa
3737
from awswrangler._distributed import EngineEnum, MemoryFormatEnum, engine, memory_format # noqa
3838

39-
engine.initialize()
39+
engine.register()
4040

4141
__all__ = [
4242
"athena",

awswrangler/_distributed.py

+10-8
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
# pylint: disable=import-outside-toplevel
44

55
import importlib.util
6+
import os
67
import threading
78
from collections import defaultdict
89
from enum import Enum, unique
910
from functools import wraps
1011
from importlib import reload
1112
from typing import Any, Callable, Dict, Literal, Optional, TypeVar, cast
1213

14+
WR_ENGINE = os.getenv("WR_ENGINE")
15+
WR_MEMORY_FORMAT = os.getenv("WR_MEMORY_FORMAT")
16+
1317

1418
@unique
1519
class EngineEnum(Enum):
@@ -35,7 +39,7 @@ class MemoryFormatEnum(Enum):
3539
class Engine:
3640
"""Execution engine configuration class."""
3741

38-
_engine: Optional[EngineEnum] = None
42+
_engine: Optional[EngineEnum] = EngineEnum[WR_ENGINE.upper()] if WR_ENGINE else None
3943
_initialized_engine: Optional[EngineEnum] = None
4044
_registry: Dict[EngineLiteral, Dict[str, Callable[..., Any]]] = defaultdict(dict)
4145
_lock: threading.RLock = threading.RLock()
@@ -73,9 +77,7 @@ def get(cls) -> EngineEnum:
7377
def set(cls, name: EngineLiteral) -> None:
7478
"""Set the distribution engine."""
7579
with cls._lock:
76-
cls._engine = EngineEnum._member_map_[ # type: ignore[assignment] # pylint: disable=protected-access,no-member
77-
name.upper()
78-
]
80+
cls._engine = EngineEnum[name.upper()]
7981

8082
@classmethod
8183
def dispatch_func(cls, source_func: FunctionType, value: Optional[EngineLiteral] = None) -> FunctionType:
@@ -99,6 +101,7 @@ def dispatch_on_engine(cls, func: FunctionType) -> FunctionType:
99101

100102
@wraps(func)
101103
def wrapper(*args: Any, **kw: Dict[str, Any]) -> Any:
104+
cls.initialize(name=cls.get().value)
102105
return cls.dispatch_func(func)(*args, **kw)
103106

104107
# Save the original function
@@ -127,8 +130,7 @@ def initialize(cls, name: Optional[EngineLiteral] = None) -> None:
127130
from awswrangler.distributed.ray import initialize_ray
128131

129132
initialize_ray()
130-
cls.register(engine_name)
131-
cls._initialized_engine = cls.get()
133+
cls._initialized_engine = EngineEnum[engine_name.upper()]
132134

133135
@classmethod
134136
def is_initialized(cls, name: Optional[EngineLiteral] = None) -> bool:
@@ -142,7 +144,7 @@ def is_initialized(cls, name: Optional[EngineLiteral] = None) -> bool:
142144
class MemoryFormat:
143145
"""Memory format configuration class."""
144146

145-
_enum: Optional[MemoryFormatEnum] = None
147+
_enum: Optional[MemoryFormatEnum] = MemoryFormatEnum[WR_MEMORY_FORMAT.upper()] if WR_MEMORY_FORMAT else None
146148
_lock: threading.RLock = threading.RLock()
147149

148150
@classmethod
@@ -178,7 +180,7 @@ def get(cls) -> MemoryFormatEnum:
178180
def set(cls, name: EngineLiteral) -> None:
179181
"""Set the memory format."""
180182
with cls._lock:
181-
cls._enum = MemoryFormatEnum._member_map_[name.upper()] # type: ignore[assignment] # pylint: disable=protected-access,no-member
183+
cls._enum = MemoryFormatEnum[name.upper()]
182184

183185
_reload()
184186

awswrangler/distributed/ray/_core.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def initialize_ray(
164164
address = ray_address
165165

166166
if address:
167-
_logger.info("Connecting to a Ray cluster at: %s", address)
167+
_logger.info("Connecting to a Ray instance at: %s", address)
168168
ray.init(
169169
address=address,
170170
include_dashboard=include_dashboard,
@@ -193,5 +193,5 @@ def initialize_ray(
193193
"env_vars": {var: os.environ.get(var) for var in ray_runtime_env_vars if os.environ.get(var)}
194194
},
195195
}
196-
_logger.info("Starting a Ray cluster")
196+
_logger.info("Initializing a Ray instance")
197197
ray.init(**ray_init_kwargs)

docs/source/api.rst

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ API Reference
2525
* `Amazon Chime`_
2626
* `Typing`_
2727
* `Global Configurations`_
28+
* `Engine and Memory Format`_
2829
* `Distributed - Ray`_
2930

3031
Amazon S3
@@ -482,6 +483,17 @@ Global Configurations
482483
reset
483484
to_pandas
484485

486+
Engine and Memory Format
487+
-------------------------
488+
489+
.. currentmodule:: awswrangler._distributed
490+
491+
.. autosummary::
492+
:toctree: stubs
493+
494+
Engine
495+
MemoryFormat
496+
485497
Distributed - Ray
486498
---------------------
487499

docs/source/scale.rst

+28-5
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,30 @@ Once installed, you can use the library in your code as usual:
1616

1717
>>> import awswrangler as wr
1818

19-
At import, SDK for pandas looks for an environmental variable called ``WR_ADDRESS``.
20-
If found, it is used to send commands to a remote cluster.
21-
If not found, a local Ray runtime is initialized on your machine instead.
22-
19+
At import, SDK for pandas checks if ``ray`` and ``modin`` are in the installation path and enables distributed mode.
2320
To confirm that you are in distributed mode, run:
2421

2522
>>> print(f"Execution Engine: {wr.engine.get()}")
2623
>>> print(f"Memory Format: {wr.memory_format.get()}")
2724

2825
which show that both Ray and Modin are enabled as an execution engine and memory format, respectively.
26+
You can switch back to non-distributed mode at any point (See `Switching modes <scale.rst#switching-modes>`__ below).
27+
28+
Initialization of the Ray cluster is lazy and only triggered when the first distributed API is executed.
29+
At that point, SDK for pandas looks for an environment variable called ``WR_ADDRESS``.
30+
If found, it is used to send commands to a remote cluster.
31+
If not found, a local Ray runtime is initialized on your machine instead.
32+
Alternatively, you can trigger Ray initialization with:
33+
34+
>>> wr.engine.initialize()
2935

3036
In distributed mode, the same ``awswrangler`` APIs can now handle much larger datasets:
3137

3238
.. code-block:: python
3339
3440
# Read Parquet data (1.2 Gb Parquet compressed)
3541
df = wr.s3.read_parquet(
36-
path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/",
42+
path=f"s3://amazon-reviews-pds/parquet/product_category=Toys/",
3743
)
3844
3945
# Drop the customer_id column
@@ -135,6 +141,23 @@ This table lists the ``awswrangler`` APIs available in distributed mode (i.e. th
135141
| | ``unload`` ||
136142
+-------------------+------------------------------+------------------+
137143

144+
Switching modes
145+
----------------
146+
The following commands showcase how to switch between distributed and non-distributed modes:
147+
148+
.. code-block:: python
149+
150+
# Switch to non-distributed
151+
wr.engine.set("python")
152+
wr.memory_format.set("pandas")
153+
154+
# Switch to distributed
155+
wr.engine.set("ray")
156+
wr.memory_format.set("modin")
157+
158+
Similarly, you can set the ``WR_ENGINE`` and ``WR_MEMORY_FORMAT`` environment variables
159+
to the desired engine and memory format, respectively.
160+
138161
Caveats
139162
--------
140163

tests/unit/test_distributed.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def wr() -> Iterator[ModuleType]:
2323
yield reload(awswrangler)
2424

2525
# Reset for future tests
26-
awswrangler.engine.initialize()
26+
awswrangler.engine.register()
2727

2828

2929
@pytest.mark.skipif(condition=not is_ray_modin, reason="ray not available")
@@ -39,7 +39,7 @@ def test_engine_python(wr: ModuleType) -> None:
3939
assert wr.engine.get_installed() == EngineEnum.RAY
4040
assert wr.engine.get() == EngineEnum.RAY
4141

42-
wr.engine.initialize(EngineEnum.PYTHON.value)
42+
wr.engine.set(EngineEnum.PYTHON.value)
4343

4444
assert wr.engine.get() == EngineEnum.PYTHON
4545

0 commit comments

Comments
 (0)