Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
arunjose696 authored and YarShev committed Nov 13, 2023
1 parent 8a332c1 commit 885c704
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 75 deletions.
28 changes: 14 additions & 14 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ jobs:
run: |
MODIN_ENGINE=dask python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
MODIN_ENGINE=ray python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
MODIN_ENGINE=unidist UNIDIST_BACKEND=mpi mpiexec -n 1 python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
MODIN_ENGINE=unidist UNIDIST_BACKEND=mpi mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))"
test-internals:
needs: [lint-flake8, lint-black-isort]
Expand Down Expand Up @@ -360,26 +360,26 @@ jobs:
run: |
sudo docker pull postgres
sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres
- run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_benchmark_mode.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_repartition.py
- run: mpiexec -n 1 python -m pytest modin/test/test_partition_api.py
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/pandas/test/internals/test_benchmark_mode.py
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/pandas/test/internals/test_repartition.py
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/test/test_partition_api.py
- uses: ./.github/actions/run-core-tests
with:
runner: mpiexec -n 1 python -m pytest
runner: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest
parallel: ""
- run: mpiexec -n 1 python -m pytest modin/numpy/test
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/numpy/test
- run: chmod +x ./.github/workflows/sql_server/set_up_sql_server.sh
- run: ./.github/workflows/sql_server/set_up_sql_server.sh
# need an extra argument "genv" to set environment variables for mpiexec. We need
# need an extra argument "genv" to set environment variables for mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist. We need
# these variables to test writing to the mock s3 filesystem.
- run: mpiexec -n 1 -genv AWS_ACCESS_KEY_ID foobar_key -genv AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest modin/pandas/test/test_io.py --verbose
- run: mpiexec -n 1 python -m pytest modin/experimental/pandas/test/test_io_exp.py
- run: mpiexec -n 1 python -m pytest modin/experimental/sql/test/test_sql.py
- run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/test_general.py
- run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe -x AWS_ACCESS_KEY_ID foobar_key -x AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest modin/pandas/test/test_io.py --verbose
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/experimental/pandas/test/test_io_exp.py
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/experimental/sql/test/test_sql.py
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/test/interchange/dataframe_protocol/test_general.py
- run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py
- run: |
python -m pip install lazy_import
mpiexec -n 1 python -m pytest modin/pandas/test/integrations/
mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/pandas/test/integrations/
- uses: ./.github/actions/upload-coverage

test-all:
Expand Down Expand Up @@ -533,7 +533,7 @@ jobs:
shell-ex: "python -m pytest"
if: needs.execution-filter.dask != 'true'
- name: unidist
shell-ex: "mpiexec -n 1 -genv AWS_ACCESS_KEY_ID foobar_key -genv AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest"
shell-ex: "mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe -x AWS_ACCESS_KEY_ID foobar_key -x AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest"
if: needs.execution-filter.unidist != 'true'
runs-on: ${{ matrix.os }}-latest
defaults:
Expand Down
117 changes: 58 additions & 59 deletions modin/pandas/test/internals/test_benchmark_mode.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,61 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import unittest.mock as mock

import pytest

import modin.pandas as pd
from modin.config import Engine

engine = Engine.get()

# We have to explicitly mock subclass implementations of wait_partitions.
if engine == "Ray":
wait_method = (
"modin.core.execution.ray.implementations."
+ "pandas_on_ray.partitioning."
+ "PandasOnRayDataframePartitionManager.wait_partitions"
)
elif engine == "Dask":
wait_method = (
"modin.core.execution.dask.implementations."
+ "pandas_on_dask.partitioning."
+ "PandasOnDaskDataframePartitionManager.wait_partitions"
)
elif engine == "Unidist":
wait_method = (
"modin.core.execution.unidist.implementations."
+ "pandas_on_unidist.partitioning."
+ "PandasOnUnidistDataframePartitionManager.wait_partitions"
)
else:
wait_method = (
"modin.core.dataframe.pandas.partitioning."
+ "partition_manager.PandasDataframePartitionManager.wait_partitions"
import sys
import mpi4py

mpi4py.rc(recv_mprobe=False, initialize=False)
from mpi4py import MPI # noqa: E402

########################
# Threads initializing #
########################

MPI.Init_thread()

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

parent_comm = MPI.Comm.Get_parent()

if rank == 0 and parent_comm == MPI.COMM_NULL and size == 1:
nprocs_to_spawn = 4
args = ["reproducer.py"]
info = MPI.Info.Create()
intercomm = MPI.COMM_SELF.Spawn(
sys.executable,
args,
maxprocs=nprocs_to_spawn,
info=info,
root=rank,
)

#######################
# Merge communicators #
#######################

@pytest.mark.parametrize("set_benchmark_mode", [False], indirect=True)
def test_turn_off(set_benchmark_mode):
df = pd.DataFrame([0])
with mock.patch(wait_method) as wait:
df.dropna()
wait.assert_not_called()


@pytest.mark.parametrize("set_benchmark_mode", [True], indirect=True)
def test_turn_on(set_benchmark_mode):
df = pd.DataFrame([0])
with mock.patch(wait_method) as wait:
df.dropna()
wait.assert_called()
# Just comment the foollowing block if you want to run without communicators merging.
if parent_comm != MPI.COMM_NULL:
comm = parent_comm.Merge(high=True)
else:
comm = intercomm.Merge(high=False)

# rank = comm.Get_rank()
# size = comm.Get_size()

# if rank == 0:
# print(f"size = {size}")
# data = {"a": 1}
# for r in range(2, size):
# comm.send(data, dest=r, tag=11)
# comm.send(data, dest=r, tag=12)
# data2 = comm.recv(source=r, tag=11)
# data2 = comm.recv(source=r, tag=12)
# elif rank == 1:

# else:
# data = comm.recv(source=0, tag=11)
# data = comm.recv(source=0, tag=12)
# comm.send(data, dest=0, tag=11)
# comm.send(data, dest=0, tag=12)

if not MPI.Is_finalized():
MPI.Finalize()
7 changes: 5 additions & 2 deletions requirements/env_unidist_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ dependencies:
# required dependencies
- pandas>=2.1,<2.2
- numpy>=1.22.4
- unidist-mpi>=0.2.1
- mpich
- cython
- openmpi
- mpi4py
- msgpack-python
- fsspec>=2022.05.0
- packaging>=21.0
- psutil>=5.8.0
Expand Down Expand Up @@ -59,3 +61,4 @@ dependencies:
- connectorx>=0.2.6a4
# The `numpydoc` version should match the version installed in the `lint-pydocstyle` job of the CI.
- numpydoc==1.1.0
- git+https://github.com/YarShev/unidist.git@dev/yigoshev-thr

0 comments on commit 885c704

Please sign in to comment.