Skip to content

Commit

Permalink
feat: make ray optional (#878)
Browse files Browse the repository at this point in the history
Disable `ray` by default. If enabled and `ray` is installed, system automatically uses it. 

---------

Co-authored-by: Gaurav <[email protected]>
  • Loading branch information
jarulraj and gaurav274 authored Jun 23, 2023
1 parent 654546d commit 671894f
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 60 deletions.
23 changes: 12 additions & 11 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ workflows:
- Linux:
name: "Test | v3.8 | Linux"
v: "3.8"
mode: "TEST"
mode: "COV"
- Linux:
name: "Test | v3.9 | Linux"
v: "3.9"
mode: "TEST"
mode: "COV"
- Linux:
name: "Test | v3.10 | Linux"
v: "3.10"
mode: "TEST"
mode: "COV"
# Ray does not work on 3.11
# https://github.com/ray-project/ray/issues/33232
# - Linux:
Expand Down Expand Up @@ -54,13 +54,13 @@ workflows:
mode: "LINTER"
### RAY
- Linux:
name: "Test | Coverage | v3.10 | Linux"
name: "Test | Ray | v3.10 | Linux"
v: "3.10"
mode: "COV"
mode: "RAY"
- Windows:
name: "Windows | v3.10"
#- MacOS:
# name: "MacOS | v3.10"
- MacOS:
name: "MacOS | v3.10"
# missing Torchvision
#- Linux:
# name: "Linux - v3.11"
Expand Down Expand Up @@ -95,16 +95,16 @@ jobs:
source test_evadb/bin/activate
pip install ".[dev]"
# Disable Ray for Coverage
# Enable Ray
- when:
condition:
equal: [ COV, << parameters.mode >> ]
equal: [ RAY, << parameters.mode >> ]
steps:
- run:
name: Disable Ray setting in the config.yml file
name: Enable Ray setting in the config.yml file
command: |
source test_evadb/bin/activate
python -c "import yaml;f = open('evadb/evadb.yml', 'r+');config_obj = yaml.load(f, Loader=yaml.FullLoader);config_obj['experimental']['ray'] = False;f.seek(0);f.write(yaml.dump(config_obj));f.truncate();"
python -c "import yaml;f = open('evadb/evadb.yml', 'r+');config_obj = yaml.load(f, Loader=yaml.FullLoader);config_obj['experimental']['ray'] = True;f.seek(0);f.write(yaml.dump(config_obj));f.truncate();"
- run:
name: Test and upload coverage report to coveralls
Expand Down Expand Up @@ -158,6 +158,7 @@ jobs:
- checkout
- run:
name: Install EVA package from GitHub repo and run tests
no_output_timeout: 30m # 30 minute timeout
command: |
python -m venv test_evadb
source test_evadb/bin/activate
Expand Down
2 changes: 1 addition & 1 deletion evadb/evadb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ server:
socket_timeout: 60

experimental:
ray: True
ray: False

third_party:
OPENAI_KEY: ""
12 changes: 6 additions & 6 deletions evadb/executor/exchange_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# limitations under the License.
from typing import Iterator

from ray.util.queue import Queue

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError
Expand Down Expand Up @@ -65,14 +63,16 @@ def build_inner_executor(self, inner_executor):
self.inner_executor.children = [QueueReaderExecutor()]

def exec(self) -> Iterator[Batch]:
from ray.util.queue import Queue

input_queue = Queue(maxsize=100)
output_queue = Queue(maxsize=100)

# Pull data from child executor
assert (
len(self.children) == 1
), "Exchange currently only supports parallelization of node with only one child"
ray_pull_task = ray_pull.remote(
ray_pull_task = ray_pull().remote(
self.ray_pull_env_conf_dict,
self.children[0],
input_queue,
Expand All @@ -82,16 +82,16 @@ def exec(self) -> Iterator[Batch]:
ray_parallel_task_list = []
for i in range(self.parallelism):
ray_parallel_task_list.append(
ray_parallel.remote(
ray_parallel().remote(
self.ray_parallel_env_conf_dict[i],
self.inner_executor,
input_queue,
output_queue,
)
)

ray_wait_and_alert.remote([ray_pull_task], input_queue)
ray_wait_and_alert.remote(ray_parallel_task_list, output_queue)
ray_wait_and_alert().remote([ray_pull_task], input_queue)
ray_wait_and_alert().remote(ray_parallel_task_list, output_queue)

while True:
res = output_queue.get(block=True)
Expand Down
83 changes: 50 additions & 33 deletions evadb/executor/ray_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@
import os
from typing import Callable, Dict, List

import ray
from ray.exceptions import RayTaskError
from ray.util.queue import Queue

from evadb.executor.executor_utils import ExecutorError


class StageCompleteSignal:
pass


@ray.remote(num_cpus=0)
def ray_wait_and_alert(tasks: List[ray.ObjectRef], queue: Queue):
try:
ray.get(tasks)
queue.put(StageCompleteSignal)
except RayTaskError as e:
queue.put(ExecutorError(e.cause))
def ray_wait_and_alert():
import ray
from ray.exceptions import RayTaskError
from ray.util.queue import Queue

@ray.remote(num_cpus=0)
def _ray_wait_and_alert(tasks: List[ray.ObjectRef], queue: Queue):
try:
ray.get(tasks)
queue.put(StageCompleteSignal)
except RayTaskError as e:
queue.put(ExecutorError(e.cause))

return _ray_wait_and_alert


# Max calls set to 1 to forcefully release GPU resource when the job is
Expand All @@ -42,25 +45,39 @@ def ray_wait_and_alert(tasks: List[ray.ObjectRef], queue: Queue):
# cleanly done on the Ray side, we need to set this to prevent memory leak.
# More detailed explanation can be found in
# https://github.com/georgia-tech-db/eva/pull/731
@ray.remote(max_calls=1)
def ray_parallel(
conf_dict: Dict[str, str],
executor: Callable,
input_queue: Queue,
output_queue: Queue,
):
for k, v in conf_dict.items():
os.environ[k] = v

gen = executor(input_queue=input_queue)
for next_item in gen:
output_queue.put(next_item)


@ray.remote(max_calls=1)
def ray_pull(conf_dict: Dict[str, str], executor: Callable, input_queue: Queue):
for k, v in conf_dict.items():
os.environ[k] = v

for next_item in executor():
input_queue.put(next_item)


def ray_parallel():
import ray
from ray.util.queue import Queue

@ray.remote(max_calls=1)
def _ray_parallel(
conf_dict: Dict[str, str],
executor: Callable,
input_queue: Queue,
output_queue: Queue,
):
for k, v in conf_dict.items():
os.environ[k] = v

gen = executor(input_queue=input_queue)
for next_item in gen:
output_queue.put(next_item)

return _ray_parallel


def ray_pull():
import ray
from ray.util.queue import Queue

@ray.remote(max_calls=1)
def _ray_pull(conf_dict: Dict[str, str], executor: Callable, input_queue: Queue):
for k, v in conf_dict.items():
os.environ[k] = v

for next_item in executor():
input_queue.put(next_item)

return _ray_pull
2 changes: 1 addition & 1 deletion evadb/udfs/emotion_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def forward(self, frames) -> pd.DataFrame:

import torch
import torch.nn.functional as F
import transforms
from torchvision import transforms

# convert to 3 channels, ten crop and stack
frames = frames.repeat(3, 1, 1)
Expand Down
4 changes: 2 additions & 2 deletions evadb/udfs/udf_bootstrap_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ def init_builtin_udfs(db: EvaDBDatabase, mode: str = "debug") -> None:
mnistcnn_udf_query,
chatgpt_udf_query,
face_detection_udf_query,
ocr_udf_query,
Mvit_udf_query,
# ocr_udf_query,
# Mvit_udf_query,
Sift_udf_query,
Yolo_udf_query,
]
Expand Down
13 changes: 7 additions & 6 deletions script/test/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ fi
if [[ "$OSTYPE" != "msys" ]];
# Non-Windows
then
if [[ "$MODE" = "TEST" || "$MODE" = "ALL" ]];
if [[ "$MODE" = "COV" || "$MODE" = "ALL" ]];
then
PYTHONPATH=./ pytest --durations=20 --capture=sys --tb=short -v --log-level=WARNING -rsf -p no:cov test -m "not benchmark"
elif [[ "$MODE" = "COV" ]];
then
# As a workaround, ray needs to be disabled for COV.
PYTHONPATH=./ pytest --durations=20 --cov-report term-missing:skip-covered --cov-config=.coveragerc --cov-context=test --cov=evadb/ --capture=sys --tb=short -v -rsf --log-level=WARNING -m "not benchmark"
elif [[ "$MODE" = "RAY" ]];
then
# As a workaround, ray needs to be disabled for RAY.

PYTHONPATH=./ pytest --durations=20 --capture=sys --tb=short -v --log-level=WARNING -rsf -p no:cov test -m "not benchmark"
fi

test_code=$?
Expand Down Expand Up @@ -110,7 +111,7 @@ fi

if [[ ( "$OSTYPE" != "msys" ) && ( "$MODE" = "NOTEBOOK" || "$MODE" = "ALL" ) ]];
then
PYTHONPATH=./ python -m pytest --durations=5 --nbmake --overwrite "./tutorials" --capture=sys --tb=short -v --log-level=WARNING --nbmake-timeout=3000
PYTHONPATH=./ python -m pytest --durations=5 --nbmake --overwrite "./tutorials" --capture=sys --tb=short -v --log-level=WARNING --nbmake-timeout=3000
notebook_test_code=$?
if [ "$notebook_test_code" != "0" ];
then
Expand Down
2 changes: 2 additions & 0 deletions test/integration_tests/test_ocr_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from test.markers import ray_skip_marker
from test.util import (
load_udfs_for_testing,
shutdown_ray,
Expand Down Expand Up @@ -43,6 +44,7 @@ def tearDown(self):
shutdown_ray()
# todo: move these to relational apis as well

@ray_skip_marker
def test_ocr_donut_huggingface(self):
conn = connect()
cursor = conn.cursor()
Expand Down

0 comments on commit 671894f

Please sign in to comment.