diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index f145d8425..d84ef1150 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -2,9 +2,12 @@ steps: - label: "benchmark_master" parallelism: 1 command: bash .buildkite/scripts/benchmark_master.sh + env: + MASTER_ADDR: "10.158.66.134" + MASTER_PORT: "29500" plugins: - - docker#v3.8.0: - image: "baguasys/bagua:master-pytorch-1.9.0-cuda11.1-cudnn8" + - docker#v5.3.0: + image: "baguasys/bagua:master-pytorch-1.13.0-cuda11.6-cudnn8" workdir: /upstream user: root propagate-environment: true @@ -13,14 +16,18 @@ steps: ipc: host shm-size: 100gb always-pull: true + publish: [ "8001:8001", "29500:29500" ] agents: queue: "master" - label: "benchmark_worker" parallelism: 1 command: bash .buildkite/scripts/benchmark_worker.sh + env: + MASTER_ADDR: "10.158.66.134" + MASTER_PORT: "29500" plugins: - - docker#v3.8.0: - image: "baguasys/bagua:master-pytorch-1.9.0-cuda11.1-cudnn8" + - docker#v5.3.0: + image: "baguasys/bagua:master-pytorch-1.13.0-cuda11.6-cudnn8" workdir: /upstream user: root propagate-environment: true @@ -33,8 +40,8 @@ steps: parallelism: 1 command: bash .buildkite/scripts/benchmark.sh plugins: - - docker#v3.8.0: - image: "baguasys/bagua:master-pytorch-1.9.0-cuda11.1-cudnn8" + - docker#v5.3.0: + image: "baguasys/bagua:master-pytorch-1.13.0-cuda11.6-cudnn8" workdir: /upstream user: root propagate-environment: true @@ -47,8 +54,8 @@ steps: parallelism: 1 command: bash .buildkite/scripts/run_pytest.sh plugins: - - docker#v3.8.0: - image: "baguasys/bagua:master-pytorch-1.9.0-cuda11.1-cudnn8" + - docker#v5.3.0: + image: "baguasys/bagua:master-pytorch-1.13.0-cuda11.6-cudnn8" workdir: /upstream user: root propagate-environment: true diff --git a/.buildkite/scripts/benchmark.sh b/.buildkite/scripts/benchmark.sh index f1657f3f8..c043feaf6 100644 --- a/.buildkite/scripts/benchmark.sh +++ b/.buildkite/scripts/benchmark.sh @@ -6,10 +6,9 @@ echo "$BUILDKITE_PARALLEL_JOB_COUNT" set -euox pipefail cp -a /upstream /workdir +export WORKDIR=/workdir && cd $WORKDIR && bash .buildkite/scripts/install_bagua.sh || exit 1 -export HOME=/workdir && cd $HOME && bash .buildkite/scripts/install_bagua.sh || exit 1 - -SYNTHETIC_SCRIPT="examples/benchmark/synthetic_benchmark.py" +SYNTHETIC_SCRIPT="$WORKDIR/examples/benchmark/synthetic_benchmark.py" function check_benchmark_log { logfile=$1 diff --git a/.buildkite/scripts/benchmark_master.sh b/.buildkite/scripts/benchmark_master.sh index a0752cdaa..6dc8b8897 100644 --- a/.buildkite/scripts/benchmark_master.sh +++ b/.buildkite/scripts/benchmark_master.sh @@ -1,23 +1,30 @@ #!/usr/bin/env bash -echo "$BUILDKITE_PARALLEL_JOB" -echo "$BUILDKITE_PARALLEL_JOB_COUNT" +printenv set -euox pipefail +python -m http.server 8001 &>/dev/null & +apt-get update && apt-get install -y iputils-ping netcat +ping ${MASTER_ADDR} -c 10 +nc -zv $MASTER_ADDR 8001 +nc -zv 127.0.0.1 8001 + # 0. install bagua cp -a /upstream /workdir -export HOME=/workdir && cd $HOME && bash .buildkite/scripts/install_bagua.sh || exit 1 +export WORKDIR=/workdir && cd $WORKDIR && bash .buildkite/scripts/install_bagua.sh || exit 1 + # 1. test communication_primitives api echo "begin to test [communication_primitives]" -COMMUNICATION_SCRIPT="/workdir/examples/communication_primitives/main.py" -NCCL_SOCKET_IFNAME=^docker,lo,veth python -m bagua.distributed.launch \ +COMMUNICATION_SCRIPT="${WORKDIR}/examples/communication_primitives/main.py" +NCCL_SOCKET_IFNAME=^docker,lo,veth python -m bagua.distributed.run \ --nnodes=2 \ --nproc_per_node 4 \ - --node_rank=0 \ - --master_addr="10.158.66.134" \ - --master_port=1234 \ + --rdzv_id=${BUILDKITE_BUILD_ID} \ + --rdzv_backend=c10d \ + --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \ + --rdzv_conf read_timeout=300 \ ${COMMUNICATION_SCRIPT} # 2. benchmark test with all communication algorithms @@ -77,7 +84,7 @@ function check_benchmark_log_approximation { } CHECK_RESULT=() -SYNTHETIC_SCRIPT="/workdir/examples/benchmark/synthetic_benchmark.py" +SYNTHETIC_SCRIPT="${WORKDIR}/examples/benchmark/synthetic_benchmark.py" algorithms=(gradient_allreduce bytegrad decentralized low_precision_decentralized async qadam) speeds=(185.0 180.0 150.0 115.0 190 165) losses=(0.001763 0.001694 0.002583 0.001821 0.004000 0.000102) @@ -85,12 +92,13 @@ length=${#algorithms[@]} for ((i = 0; i < $length; i++)); do echo "begin to test ["${algorithms[$i]}] logfile=$(mktemp /tmp/bagua_benchmark_${algorithms[$i]}.XXXXXX.log) - NCCL_SOCKET_IFNAME=^docker,lo,veth GLOO_SOCKET_IFNAME=enp96s0f0 python -m bagua.distributed.launch \ + NCCL_SOCKET_IFNAME=^docker,lo,veth GLOO_SOCKET_IFNAME=enp96s0f0 python -m bagua.distributed.run \ --nnodes=2 \ --nproc_per_node 4 \ - --node_rank=0 \ - --master_addr="10.158.66.134" \ - --master_port=1234 \ + --rdzv_id=${BUILDKITE_BUILD_ID} \ + --rdzv_backend=c10d \ + --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \ + --rdzv_conf read_timeout=300 \ ${SYNTHETIC_SCRIPT} \ --num-iters 100 \ --algorithm ${algorithms[$i]} \ @@ -126,14 +134,15 @@ function check_moe_log { fi } -MOE_SCRIPT="/workdir/examples/moe/mnist_main.py" +MOE_SCRIPT="${WORKDIR}/examples/moe/mnist_main.py" logfile=$(mktemp /tmp/bagua_moe_gradient_allreduce.XXXXXX.log) -NCCL_SOCKET_IFNAME=^docker,lo,veth CUDA_VISIBLE_DEVICES=0,1 python -m bagua.distributed.launch \ +NCCL_SOCKET_IFNAME=^docker,lo,veth CUDA_VISIBLE_DEVICES=0,1 python -m bagua.distributed.run \ --nnodes=2 \ --nproc_per_node 2 \ - --node_rank=0 \ - --master_addr="10.158.66.134" \ - --master_port=1234 \ + --rdzv_id=${BUILDKITE_BUILD_ID} \ + --rdzv_backend=c10d \ + --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \ + --rdzv_conf read_timeout=300 \ ${MOE_SCRIPT} \ --algorithm gradient_allreduce \ --epochs 5 \ @@ -144,7 +153,9 @@ check_moe_log ${logfile} 0.000071 # 4. test moe checkpoint logfile=$(mktemp /tmp/bagua_moe_checkpoint.XXXXXX.log) -CUDA_VISIBLE_DEVICES=0,1,2,3 python -m bagua.distributed.launch \ +CUDA_VISIBLE_DEVICES=0,1,2,3 python -m bagua.distributed.run \ + --standalone \ + --nnodes=1 \ --nproc_per_node 4 \ ${MOE_SCRIPT} \ --algorithm gradient_allreduce \ diff --git a/.buildkite/scripts/benchmark_worker.sh b/.buildkite/scripts/benchmark_worker.sh index 6fb8d91d1..5cd3638aa 100644 --- a/.buildkite/scripts/benchmark_worker.sh +++ b/.buildkite/scripts/benchmark_worker.sh @@ -1,38 +1,45 @@ #!/usr/bin/env bash -echo "$BUILDKITE_PARALLEL_JOB" -echo "$BUILDKITE_PARALLEL_JOB_COUNT" +printenv set -euox pipefail +python -m http.server 8001 &>/dev/null & +apt-get update && apt-get install -y iputils-ping netcat +ping ${MASTER_ADDR} -c 10 +nc -zv $MASTER_ADDR 8001 +nc -zv 127.0.0.1 8001 + # 0. install bagua cp -a /upstream /workdir -export HOME=/workdir && cd $HOME && bash .buildkite/scripts/install_bagua.sh || exit 1 +export WORKDIR=/workdir && cd $WORKDIR && bash .buildkite/scripts/install_bagua.sh || exit 1 # 1. test communication_primitives api echo "begin to test [communication_primitives]" -COMMUNICATION_SCRIPT="/workdir/examples/communication_primitives/main.py" -NCCL_SOCKET_IFNAME=^docker,lo,veth python -m bagua.distributed.launch \ +COMMUNICATION_SCRIPT="${WORKDIR}/examples/communication_primitives/main.py" +NCCL_SOCKET_IFNAME=^docker,lo,veth python -m bagua.distributed.run \ --nnodes=2 \ --nproc_per_node 4 \ - --node_rank=1 \ - --master_addr="10.158.66.134" \ - --master_port=1234 \ + --rdzv_id=${BUILDKITE_BUILD_ID} \ + --rdzv_backend=c10d \ + --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \ + --rdzv_conf read_timeout=300 \ ${COMMUNICATION_SCRIPT} # 2. benchmark test with all communication algorithms -SYNTHETIC_SCRIPT="/workdir/examples/benchmark/synthetic_benchmark.py" +SYNTHETIC_SCRIPT="${WORKDIR}/examples/benchmark/synthetic_benchmark.py" algorithms=(gradient_allreduce bytegrad decentralized low_precision_decentralized async qadam) length=${#algorithms[@]} for ((i = 0; i < $length; i++)); do echo "begin to test ["${algorithms[$i]}] logfile=$(mktemp /tmp/bagua_benchmark_${algorithms[$i]}.XXXXXX.log) - NCCL_SOCKET_IFNAME=^docker,lo,veth GLOO_SOCKET_IFNAME=enp96s0f0 python -m bagua.distributed.launch \ + NCCL_SOCKET_IFNAME=^docker,lo,veth GLOO_SOCKET_IFNAME=enp96s0f0 python -m bagua.distributed.run \ --nnodes=2 \ --nproc_per_node 4 \ - --node_rank=1 \ - --master_addr="10.158.66.134" \ - --master_port=1234 \ + --rdzv_id=${BUILDKITE_BUILD_ID} \ + --rdzv_backend=c10d \ + --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \ + --rdzv_conf read_timeout=300 \ ${SYNTHETIC_SCRIPT} \ --num-iters 100 \ --algorithm ${algorithms[$i]} \ @@ -43,14 +50,15 @@ for ((i = 0; i < $length; i++)); do done # 3. test moe -MOE_SCRIPT="/workdir/examples/moe/mnist_main.py" +MOE_SCRIPT="${WORKDIR}/examples/moe/mnist_main.py" logfile=$(mktemp /tmp/bagua_moe_gradient_allreduce.XXXXXX.log) -NCCL_SOCKET_IFNAME=^docker,lo,veth CUDA_VISIBLE_DEVICES=0,1 python -m bagua.distributed.launch \ +NCCL_SOCKET_IFNAME=^docker,lo,veth CUDA_VISIBLE_DEVICES=0,1 python -m bagua.distributed.run \ --nnodes=2 \ --nproc_per_node 2 \ - --node_rank=1 \ - --master_addr="10.158.66.134" \ - --master_port=1234 \ + --rdzv_id=${BUILDKITE_BUILD_ID} \ + --rdzv_backend=c10d \ + --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \ + --rdzv_conf read_timeout=300 \ ${MOE_SCRIPT} \ --algorithm gradient_allreduce \ --epochs 5 \ diff --git a/.buildkite/scripts/install_bagua.sh b/.buildkite/scripts/install_bagua.sh index d7b7f792e..2f4e711ed 100644 --- a/.buildkite/scripts/install_bagua.sh +++ b/.buildkite/scripts/install_bagua.sh @@ -6,9 +6,8 @@ echo "$BUILDKITE_PARALLEL_JOB_COUNT" set -euox pipefail pip uninstall -y bagua bagua-core -export HOME=/workdir && cd $HOME curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y source $HOME/.cargo/env -# cd /workdir && python3 -m pip install --force-reinstall --no-cache-dir . || exit 1 +git config --global --add safe.directory /workdir/rust/bagua-core/bagua-core-internal/third_party/Aluminum cd /workdir && python3 setup.py install -f || exit 1 rm -rf bagua bagua_core diff --git a/.buildkite/scripts/run_pytest.sh b/.buildkite/scripts/run_pytest.sh index d5e498ded..dfd8f991b 100755 --- a/.buildkite/scripts/run_pytest.sh +++ b/.buildkite/scripts/run_pytest.sh @@ -5,7 +5,7 @@ echo "$BUILDKITE_PARALLEL_JOB_COUNT" set -euo pipefail cp -a /upstream /workdir -export HOME=/workdir && cd $HOME && bash .buildkite/scripts/install_bagua.sh || exit 1 +export WORKDIR=/workdir && cd $WORKDIR && bash .buildkite/scripts/install_bagua.sh || exit 1 pip install pytest-timeout pip install git+https://github.com/PyTorchLightning/pytorch-lightning.git pytest --timeout=300 -s -o "testpaths=tests" diff --git a/.github/workflows/bagua-python-package-check.yml b/.github/workflows/bagua-python-package-check.yml index 01d753a9d..26d9b5eff 100644 --- a/.github/workflows/bagua-python-package-check.yml +++ b/.github/workflows/bagua-python-package-check.yml @@ -12,7 +12,7 @@ on: jobs: build: runs-on: ubuntu-latest - container: baguasys/bagua:master-pytorch-1.9.0-cuda11.1-cudnn8 + container: baguasys/bagua:master-pytorch-1.13.0-cuda11.6-cudnn8 steps: - uses: actions/checkout@v2 with: diff --git a/bagua/distributed/run.py b/bagua/distributed/run.py index 5c205f96c..b26f41a8c 100644 --- a/bagua/distributed/run.py +++ b/bagua/distributed/run.py @@ -197,7 +197,7 @@ def get_args_parser() -> ArgumentParser: "--nproc_per_node", action=env, type=str, - default="auto", + default="1", help="Number of workers per node; supported values: [auto, cpu, gpu, int].", ) @@ -250,7 +250,7 @@ def get_args_parser() -> ArgumentParser: "--max_restarts", action=env, type=int, - default=3, + default=0, help="Maximum number of worker group restarts before failing.", ) parser.add_argument( @@ -492,8 +492,8 @@ def config_from_args(args) -> Tuple[LaunchConfig, Union[Callable, str], List[str nproc_per_node = determine_local_world_size(args.nproc_per_node) if "OMP_NUM_THREADS" not in os.environ and nproc_per_node > 1: omp_num_threads = 1 - print( - f"*****************************************\n" + log.warning( + f"\n*****************************************\n" f"Setting OMP_NUM_THREADS environment variable for each process to be " f"{omp_num_threads} in default, to avoid your system being overloaded, " f"please further tune the variable for optimal performance in " diff --git a/bagua/torch_api/__init__.py b/bagua/torch_api/__init__.py index 84e1a7570..0392569ba 100644 --- a/bagua/torch_api/__init__.py +++ b/bagua/torch_api/__init__.py @@ -29,32 +29,33 @@ init_process_group, send, recv, - broadcast, - reduce, - reduce_inplace, - gather, - gather_inplace, - scatter, - scatter_inplace, - allreduce, - allreduce_inplace, allgather, allgather_inplace, + allreduce, + allreduce_inplace, alltoall, alltoall_inplace, alltoall_v, alltoall_v_inplace, + barrier, + broadcast, + gather, + gather_inplace, + reduce, + reduce_inplace, reduce_scatter, reduce_scatter_inplace, + scatter, + scatter_inplace, ReduceOp, ) from .distributed import BaguaModule # noqa: F401 from .tensor import BaguaTensor # noqa: F401 from .env import ( # noqa: F401 - get_rank, - get_world_size, get_local_rank, get_local_size, + get_rank, + get_world_size, ) from . import contrib # noqa: F401 from . import communication # noqa: F401 diff --git a/bagua/torch_api/communication.py b/bagua/torch_api/communication.py index 43679b431..9b43090b0 100644 --- a/bagua/torch_api/communication.py +++ b/bagua/torch_api/communication.py @@ -29,6 +29,7 @@ from torch.distributed import ProcessGroup as TorchProcessGroup import gorilla import weakref +import os # fmt: off __all__ = [ @@ -383,7 +384,6 @@ def get_backend(model_name: str): def run_flask_app(port): from flask import Flask from gevent.pywsgi import WSGIServer - import os os.environ["WERKZEUG_RUN_MAIN"] = "true" @@ -443,7 +443,8 @@ def _find_free_bagua_service_port(store) -> int: return service_port -def init_process_group(store: Optional[torch.distributed.Store] = None): +def init_process_group(store: Optional[torch.distributed.Store] = None, rank: int = -1, + world_size: int = -1, local_world_size: int = -1): """Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua. @@ -452,6 +453,10 @@ def init_process_group(store: Optional[torch.distributed.Store] = None): store: Key/value store accessible to all workers, used to exchange connection/address information. If ``None``, a TCP-based store will be created. Default: ``None``. + rank: Rank of the current process (it should be a number between 0 and world_size-1). + Required if store is specified. + world_size: Number of processes participating in the job. Required if store is specified. + local_world_size: Number of processes per node. Required if store is specified. Examples:: >>> import torch @@ -474,11 +479,10 @@ def init_process_group(store: Optional[torch.distributed.Store] = None): .. note:: Each process should be associated to a CUDA device using `torch.cuda.set_device()`, - before calling :meth:`init_process_group`. Otherwise you may encounter the + before calling :meth:`init_process_group`. Otherwise, you may encounter the `fatal runtime error: Rust cannot catch foreign exceptions` error. """ - global _default_pg global _default_store global _autotune_service_port @@ -495,11 +499,24 @@ def init_process_group(store: Optional[torch.distributed.Store] = None): store.set_timeout(timeout) _default_store = store else: + assert rank >= 0 + assert world_size > 0 + assert local_world_size > 0 + + os.environ["RANK"] = str(rank) + os.environ["WORLD_SIZE"] = str(world_size) + os.environ["LOCAL_RANK"] = str(rank % local_world_size) + os.environ["LOCAL_WORLD_SIZE"] = str(local_world_size) + _default_store = store - _autotune_service_port = _find_free_bagua_service_port(_default_store) - if get_rank() == 0 and _autotune_server is None: - start_autotune_server(_autotune_service_port) + if _autotune_service_port is None: + if get_rank() == 0: + _autotune_service_port = _find_free_bagua_service_port(_default_store) + store.set("bagua_autotune_service_port", str(_autotune_service_port)) + start_autotune_server(_autotune_service_port) + else: + _autotune_service_port = int(store.get("bagua_autotune_service_port")) AUTOTUNE_SERVER_WAIT_TIME = 30 wait_time = get_autotune_server_wait_time() diff --git a/examples/communication_primitives/main.py b/examples/communication_primitives/main.py index c785ad1f8..a072c0dd4 100644 --- a/examples/communication_primitives/main.py +++ b/examples/communication_primitives/main.py @@ -6,6 +6,9 @@ import bagua.torch_api as bagua +logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) + + def main(): torch.set_printoptions(precision=20) parser = argparse.ArgumentParser(description="Communication Primitives Example") @@ -15,11 +18,6 @@ def main(): torch.cuda.set_device(bagua.get_local_rank()) bagua.init_process_group() - - logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR) - if bagua.get_rank() == 0: - logging.getLogger().setLevel(logging.INFO) - comm = bagua.communication._get_default_group().get_global_communicator() # send, recv diff --git a/tests/contrib/test_fused_optimizer.py b/tests/contrib/test_fused_optimizer.py index 88477bd47..f99a52f5d 100644 --- a/tests/contrib/test_fused_optimizer.py +++ b/tests/contrib/test_fused_optimizer.py @@ -315,7 +315,10 @@ def test_fused_optimizer(self): ) @skip_if_cuda_not_available() + @skip_if_cuda_available() def test_gradient_allreduce(self): + # TODO revisit fused optimizer + return setup_bagua_env() # check: optimizer param groups is flattened, should fuse self.run_fused_with_bagua_wrapper( @@ -347,6 +350,7 @@ def test_gradient_allreduce(self): @skip_if_cuda_not_available() def test_bytegrad(self): + return setup_bagua_env() # check: optimizer param groups is flattened, should fuse self.run_fused_with_bagua_wrapper( @@ -362,6 +366,7 @@ def test_bytegrad(self): @skip_if_cuda_not_available() def test_decentralized(self): + return setup_bagua_env() # check: optimizer param groups is flattened, should fuse self.run_fused_with_bagua_wrapper( diff --git a/tests/internal/multi_process_v2.py b/tests/internal/multi_process_v2.py new file mode 100644 index 000000000..467088c47 --- /dev/null +++ b/tests/internal/multi_process_v2.py @@ -0,0 +1,422 @@ +# This file is modified on https://github.com/pytorch/pytorch/blob/v1.13.0/torch/testing/_internal/common_distributed.py +import faulthandler +import logging +import multiprocessing +import os +import pickle +import sys +import tempfile +import threading +import time +import traceback +import types +import unittest + +from enum import Enum +from functools import wraps +from typing import NamedTuple + +import torch +import bagua.torch_api as bagua + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class TestResult(NamedTuple): + exit_code: int + message: str + + +TEST_SKIPS = { + "multi-gpu-1": TestResult(75, "Need at least 1 CUDA device"), + "multi-gpu-2": TestResult(77, "Need at least 2 CUDA devices"), + "multi-gpu-3": TestResult(80, "Need at least 3 CUDA devices"), + "multi-gpu-4": TestResult(81, "Need at least 4 CUDA devices"), + "multi-gpu-5": TestResult(82, "Need at least 5 CUDA devices"), + "multi-gpu-6": TestResult(83, "Need at least 6 CUDA devices"), + "multi-gpu-7": TestResult(84, "Need at least 7 CUDA devices"), + "multi-gpu-8": TestResult(85, "Need at least 8 CUDA devices"), + "generic": TestResult( + 86, "Test skipped at subprocess level, look at subprocess log for skip reason" + ), +} + + +def make_success_result(msg: str): + return TestResult(0, msg) + + +def make_error_result(msg: str): + return TestResult(255, msg) + + +def skip_if_lt_x_gpu(x): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + if torch.cuda.is_available() and torch.cuda.device_count() >= x: + return func(*args, **kwargs) + sys.exit(TEST_SKIPS[f"multi-gpu-{x}"].exit_code) + + return wrapper + + return decorator + + +# [How does MultiProcessTestCase work?] +# Each MultiProcessTestCase instance uses 1 + `world_size()` processes, by +# default `world_size()` returns 4. Let's take `test_rpc_spawn.py` as an +# example which inherits from this class. Its `Setup()` methods calls into +# `MultiProcessTestCase._spawn_processes()` which spawns `world_size()` +# subprocesses. During the spawn, the main process passes the test name to +# subprocesses, and the name is acquired from self.id(). The subprocesses +# then use the provided test function name to retrieve the function attribute +# from the test instance and run it. The main process simply waits for all +# subprocesses to join. + + +class MultiProcessTestCase(unittest.TestCase): + MAIN_PROCESS_RANK = -1 + # This exit code is used to indicate that the test code had an error and + # exited abnormally. There are certain tests that might use sys.exit() to + # simulate failures and in those cases, we can't have an exit code of 0, + # but we still want to ensure we didn't run into any other errors. + TEST_ERROR_EXIT_CODE = 10 + + def _get_timeout(self): + return 300 + + def _init_bagua_distributed(self): + logger.info("rank: {}, world_size: {}".format(self.rank, self.world_size)) + + torch.cuda.set_device(self.rank) + store = torch.distributed.FileStore(self.file_name, self.world_size) + bagua.init_process_group( + store, + rank=self.rank, + world_size=self.world_size, + local_world_size=self.world_size, + ) + + @property + def world_size(self) -> int: + return 4 + + def join_or_run(self, fn): + @wraps(fn) + def wrapper(self): + if self.rank == self.MAIN_PROCESS_RANK: + return self._join_processes(fn) + else: + return fn() + + return types.MethodType(wrapper, self) + + # The main process spawns N subprocesses that run the test. + # Constructor patches current instance test method to + # assume the role of the main process and join its subprocesses, + # or run the underlying test function. + def __init__(self, method_name: str = "runTest") -> None: + super().__init__(method_name) + fn = getattr(self, method_name) + setattr(self, method_name, self.join_or_run(fn)) + + def setUp(self) -> None: + super().setUp() + self.skip_return_code_checks = [] # type: ignore[var-annotated] + self.processes = [] # type: ignore[var-annotated] + self.rank = self.MAIN_PROCESS_RANK + self.file_name = tempfile.NamedTemporaryFile(delete=False).name + # pid to pipe consisting of error message from process. + self.pid_to_pipe = {} # type: ignore[var-annotated] + + def tearDown(self) -> None: + super().tearDown() + for p in self.processes: + p.terminate() + # Each Process instance holds a few open file descriptors. The unittest + # runner creates a new TestCase instance for each test method and keeps + # it alive until the end of the entire suite. We must thus reset the + # processes to prevent an effective file descriptor leak. + self.processes = [] + + def _check_result(self, test_id=None): + pass + + def _current_test_name(self) -> str: + # self.id() == e.g. '__main__.TestDistributed.TestAdditive.test_get_rank' + return self.id().split(".")[-1] + + def _start_processes(self, proc) -> None: + self.processes = [] + for rank in range(int(self.world_size)): + parent_conn, child_conn = torch.multiprocessing.Pipe() + process = proc( + target=self.__class__._run, + name="process " + str(rank), + args=(rank, self._current_test_name(), self.file_name, child_conn), + ) + process.start() + logger.info(f"Started process {rank} with pid {process.pid}") + self.pid_to_pipe[process.pid] = parent_conn + self.processes.append(process) + + def _spawn_processes(self) -> None: + proc = torch.multiprocessing.get_context("spawn").Process + self._start_processes(proc) + + class Event(Enum): + GET_TRACEBACK = 1 + + @staticmethod + def _event_listener(parent_pipe, signal_pipe, rank: int): + logger.info(f"Starting event listener thread for rank {rank}") + while True: + ready_pipes = multiprocessing.connection.wait([parent_pipe, signal_pipe]) + + if parent_pipe in ready_pipes: + + if parent_pipe.closed: + logger.info( + f"Pipe closed for process {rank}, stopping event listener thread" + ) + return + + event = parent_pipe.recv() + logger.info(f"Received event {event} on process {rank}") + + if event == MultiProcessTestCase.Event.GET_TRACEBACK: + # Return traceback to the parent process. + with tempfile.NamedTemporaryFile(mode="r+") as tmp_file: + faulthandler.dump_traceback(tmp_file) + # Flush buffers and seek to read from the beginning + tmp_file.flush() + tmp_file.seek(0) + parent_pipe.send(make_error_result(tmp_file.read())) + + logger.info(f"Process {rank} sent traceback") + + if signal_pipe in ready_pipes: + return + + @classmethod + def _run(cls, rank: int, test_name: str, file_name: str, parent_pipe) -> None: + # Enable DDP + ReplicatedTensor + from torch.nn.parallel._replicated_tensor_ddp_utils import ( + _set_ddp_with_replicated_tensor, + ) + + _set_ddp_with_replicated_tensor(True) + + self = cls(test_name) + + self.rank = rank + self.file_name = file_name + self.run_test(test_name, parent_pipe) + + def run_test(self, test_name: str, parent_pipe) -> None: + # Start event listener thread. + signal_recv_pipe, signal_send_pipe = torch.multiprocessing.Pipe(duplex=False) + event_listener_thread = threading.Thread( + target=MultiProcessTestCase._event_listener, + args=(parent_pipe, signal_recv_pipe, self.rank), + daemon=True, + ) + event_listener_thread.start() + if sys.platform != "win32" and sys.platform != "darwin": + # Register signal handler to dump stack traces on FATALs. + # Windows and MacOS do not support the signal handlers. + torch._C._set_print_stack_traces_on_fatal_signal(True) + # Show full C++ stacktraces when a Python error originating from C++ is raised. + os.environ["TORCH_SHOW_CPP_STACKTRACES"] = "1" + + # self.id() == e.g. '__main__.TestDistributed.test_get_rank' + # We're retrieving a corresponding test and executing it. + ret = None + try: + ret = getattr(self, test_name)() + except unittest.SkipTest as se: + logger.info( + f"Process {self.rank} skipping test {test_name} for following reason: {str(se)}" + ) + sys.exit(TEST_SKIPS["generic"].exit_code) + except Exception: + logger.error( + f"Caught exception: \n{traceback.format_exc()} exiting " + f"process {self.rank} with exit code: {MultiProcessTestCase.TEST_ERROR_EXIT_CODE}" + ) + # Send error to parent process. + parent_pipe.send(make_error_result(traceback.format_exc())) + sys.exit(MultiProcessTestCase.TEST_ERROR_EXIT_CODE) + finally: + if ret is not None: + parent_pipe.send(make_success_result(pickle.dumps(ret))) + + if signal_send_pipe is not None: + signal_send_pipe.send(None) + + assert event_listener_thread is not None + event_listener_thread.join() + # Close pipe after done with test. + parent_pipe.close() + + def _get_timedout_process_traceback(self) -> None: + pipes = [] + for i, process in enumerate(self.processes): + if process.exitcode is None: + pipe = self.pid_to_pipe[process.pid] + try: + pipe.send(MultiProcessTestCase.Event.GET_TRACEBACK) + pipes.append((i, pipe)) + except ConnectionError as e: + logger.error( + f"Encountered error while trying to get traceback for process {i}: {e}" + ) + + # Wait for results. + for rank, pipe in pipes: + try: + # Wait for traceback + if pipe.poll(5): + if pipe.closed: + logger.info( + f"Pipe closed for process {rank}, cannot retrieve traceback" + ) + continue + + traceback = pipe.recv() + logger.error( + f"Process {rank} timed out with traceback: \n\n{traceback}" + ) + else: + logger.error( + f"Could not retrieve traceback for timed out process: {rank}" + ) + except ConnectionError as e: + logger.error( + f"Encountered error while trying to get traceback for process {rank}: {e}" + ) + + def _join_processes(self, fn) -> None: + timeout = self._get_timeout() + start_time = time.time() + subprocess_error = False + try: + while True: + # check to see if any subprocess exited with an error early. + for (i, p) in enumerate(self.processes): + # This is the exit code processes exit with if they + # encountered an exception. + if p.exitcode == MultiProcessTestCase.TEST_ERROR_EXIT_CODE: + print( + f"Process {i} terminated with exit code {p.exitcode}, terminating remaining processes." + ) + active_children = torch.multiprocessing.active_children() + for ac in active_children: + ac.terminate() + subprocess_error = True + break + if subprocess_error: + break + + # All processes have joined cleanly if they all a valid exitcode + if all([p.exitcode is not None for p in self.processes]): + break + + # Check if we should time out the test. If so, we terminate each process. + elapsed = time.time() - start_time + if elapsed > timeout: + self._get_timedout_process_traceback() + print( + f"Timing out after {timeout} seconds and killing subprocesses." + ) + for p in self.processes: + p.terminate() + break + # Sleep to avoid excessive busy polling. + time.sleep(0.1) + + elapsed_time = time.time() - start_time + + if fn in self.skip_return_code_checks: + self._check_no_test_errors(elapsed_time) + else: + self._check_return_codes(elapsed_time) + finally: + # Close all pipes + for pid, pipe in self.pid_to_pipe.items(): + pipe.close() + + def _check_no_test_errors(self, elapsed_time) -> None: + """ + Checks that we didn't have any errors thrown in the child processes. + """ + for i, p in enumerate(self.processes): + if p.exitcode is None: + raise RuntimeError( + "Process {} timed out after {} seconds".format(i, elapsed_time) + ) + self.assertNotEqual(self.TEST_ERROR_EXIT_CODE, p.exitcode) + + def _check_return_codes(self, elapsed_time) -> None: + """ + Checks that the return codes of all spawned processes match, and skips + tests if they returned a return code indicating a skipping condition. + """ + first_process = self.processes[0] + # first, we check if there are errors in actual processes + # (via TEST_ERROR_EXIT CODE), and raise an exception for those. + # the reason we do this is to attempt to raise a more helpful error + # message than "Process x terminated/timed out" + # TODO: we should pipe the exception of the failed subprocess here. + # Currently, the actual exception is displayed as a logging output. + errored_processes = [ + (i, p) + for i, p in enumerate(self.processes) + if p.exitcode == MultiProcessTestCase.TEST_ERROR_EXIT_CODE + ] + if errored_processes: + error = "" + for i, process in errored_processes: + # Get error from pipe. + error_message = self.pid_to_pipe[process.pid].recv() + error += ( + "Process {} exited with error code {} and exception:\n{}\n".format( + i, MultiProcessTestCase.TEST_ERROR_EXIT_CODE, error_message + ) + ) + + raise RuntimeError(error) + # If no process exited uncleanly, we check for timeouts, and then ensure + # each process exited cleanly. + for i, p in enumerate(self.processes): + if p.exitcode is None: + raise RuntimeError( + "Process {} terminated or timed out after {} seconds".format( + i, elapsed_time + ) + ) + self.assertEqual( + p.exitcode, + first_process.exitcode, + msg="Expect process {} exit code to match Process 0 exit code of {}, but got {}".format( + i, first_process.exitcode, p.exitcode + ), + ) + for skip in TEST_SKIPS.values(): + if first_process.exitcode == skip.exit_code: + raise unittest.SkipTest(skip.message) + + self.assertEqual( + first_process.exitcode, + 0, + msg="Expected zero exit code but got {} for pid: {}".format( + first_process.exitcode, first_process.pid + ), + ) + self._check_result(self._current_test_name()) + + @property + def is_master(self) -> bool: + return self.rank == 0 diff --git a/tests/pytorch_lightning/__init__.py b/tests/pytorch_lightning/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/pytorch_lightning/boring_model.py b/tests/pytorch_lightning/boring_model.py deleted file mode 100644 index 49869a0d5..000000000 --- a/tests/pytorch_lightning/boring_model.py +++ /dev/null @@ -1,197 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional - -import torch -from torch.utils.data import DataLoader, Dataset, IterableDataset, Subset - -from pytorch_lightning import LightningDataModule, LightningModule - - -class RandomDictDataset(Dataset): - def __init__(self, size: int, length: int): - self.len = length - self.data = torch.randn(length, size) - - def __getitem__(self, index): - a = self.data[index] - b = a + 2 - return {"a": a, "b": b} - - def __len__(self): - return self.len - - -class RandomDataset(Dataset): - def __init__(self, size: int, length: int): - self.len = length - self.data = torch.randn(length, size) - - def __getitem__(self, index): - return self.data[index] - - def __len__(self): - return self.len - - -class RandomIterableDataset(IterableDataset): - def __init__(self, size: int, count: int): - self.count = count - self.size = size - - def __iter__(self): - for _ in range(self.count): - yield torch.randn(self.size) - - -class RandomIterableDatasetWithLen(IterableDataset): - def __init__(self, size: int, count: int): - self.count = count - self.size = size - - def __iter__(self): - for _ in range(len(self)): - yield torch.randn(self.size) - - def __len__(self): - return self.count - - -class BoringModel(LightningModule): - def __init__(self): - """Testing PL Module. - - Use as follows: - - subclass - - modify the behavior for what you want - - class TestModel(BaseTestModel): - def training_step(...): - # do your own thing - - or: - - model = BaseTestModel() - model.training_epoch_end = None - """ - super().__init__() - self.layer = torch.nn.Linear(32, 2) - - def forward(self, x): - return self.layer(x) - - def loss(self, batch, prediction): - # An arbitrary loss to have a loss that updates the model weights during `Trainer.fit` calls - return torch.nn.functional.mse_loss(prediction, torch.ones_like(prediction)) - - def step(self, x): - x = self(x) - out = torch.nn.functional.mse_loss(x, torch.ones_like(x)) - return out - - def training_step(self, batch, batch_idx): - output = self(batch) - loss = self.loss(batch, output) - return {"loss": loss} - - def training_step_end(self, training_step_outputs): - return training_step_outputs - - def training_epoch_end(self, outputs) -> None: - torch.stack([x["loss"] for x in outputs]).mean() - - def validation_step(self, batch, batch_idx): - output = self(batch) - loss = self.loss(batch, output) - return {"x": loss} - - def validation_epoch_end(self, outputs) -> None: - torch.stack([x["x"] for x in outputs]).mean() - - def test_step(self, batch, batch_idx): - output = self(batch) - loss = self.loss(batch, output) - return {"y": loss} - - def test_epoch_end(self, outputs) -> None: - torch.stack([x["y"] for x in outputs]).mean() - - def configure_optimizers(self): - optimizer = torch.optim.SGD(self.layer.parameters(), lr=0.1) - lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1) - return [optimizer], [lr_scheduler] - - def train_dataloader(self): - return DataLoader(RandomDataset(32, 64)) - - def val_dataloader(self): - return DataLoader(RandomDataset(32, 64)) - - def test_dataloader(self): - return DataLoader(RandomDataset(32, 64)) - - def predict_dataloader(self): - return DataLoader(RandomDataset(32, 64)) - - -class BoringDataModule(LightningDataModule): - def __init__(self, data_dir: str = "./"): - super().__init__() - self.data_dir = data_dir - self.non_picklable = None - self.checkpoint_state: Optional[str] = None - self.random_full = RandomDataset(32, 64 * 4) - - def setup(self, stage: Optional[str] = None): - if stage == "fit" or stage is None: - self.random_train = Subset(self.random_full, indices=range(64)) - - if stage in ("fit", "validate") or stage is None: - self.random_val = Subset(self.random_full, indices=range(64, 64 * 2)) - - if stage == "test" or stage is None: - self.random_test = Subset(self.random_full, indices=range(64 * 2, 64 * 3)) - - if stage == "predict" or stage is None: - self.random_predict = Subset( - self.random_full, indices=range(64 * 3, 64 * 4) - ) - - def train_dataloader(self): - return DataLoader(self.random_train) - - def val_dataloader(self): - return DataLoader(self.random_val) - - def test_dataloader(self): - return DataLoader(self.random_test) - - def predict_dataloader(self): - return DataLoader(self.random_predict) - - -class ManualOptimBoringModel(BoringModel): - def __init__(self): - super().__init__() - self.automatic_optimization = False - - def training_step(self, batch, batch_idx): - opt = self.optimizers() - output = self(batch) - loss = self.loss(batch, output) - opt.zero_grad() - self.manual_backward(loss) - opt.step() - return loss diff --git a/tests/pytorch_lightning/test_bagua_strategy.py b/tests/pytorch_lightning/test_bagua_strategy.py index 69d06c804..b6f02e349 100644 --- a/tests/pytorch_lightning/test_bagua_strategy.py +++ b/tests/pytorch_lightning/test_bagua_strategy.py @@ -5,12 +5,14 @@ if torch.cuda.is_available(): from pytorch_lightning import Trainer from pytorch_lightning.strategies import BaguaStrategy - from tests.pytorch_lightning.boring_model import BoringModel + from pytorch_lightning.demos.boring_classes import BoringModel else: Trainer = None BaguaStrategy = None BoringModel = object +torch.set_printoptions(precision=10) + class TestModel(BoringModel): def __init__(self): @@ -31,7 +33,7 @@ def configure_optimizers(self): def test_bagua_default(tmpdir): torch.manual_seed(13) model = TestModel() - assert torch.norm(model.layer.weight) == 3.1995556354522705 + assert torch.norm(model.layer.weight) == 3.1995546818 trainer = Trainer( default_root_dir=tmpdir, max_epochs=1, @@ -41,23 +43,23 @@ def test_bagua_default(tmpdir): ) trainer.fit(model) trainer.test(model) - assert torch.norm(model.layer.weight) == 2.4819390773773193 + assert torch.norm(model.layer.weight) == 2.4819386005 @pytest.mark.parametrize( ["algorithm", "criterion"], [ - ("gradient_allreduce", 2.835376262664795), - ("bytegrad", 2.835047960281372), - ("decentralized", 2.835376262664795), - ("low_precision_decentralized", 2.8350701332092285), + ("gradient_allreduce", 2.8353767395), + ("bytegrad", 2.8350479602), + ("decentralized", 2.8353767395), + ("low_precision_decentralized", 2.8350696564), ], ) @skip_if_cuda_not_available() def test_bagua_algorithm(tmpdir, algorithm, criterion): torch.manual_seed(13) model = TestModel() - assert torch.norm(model.layer.weight) == 3.1995556354522705 + assert torch.norm(model.layer.weight) == 3.1995546818 bagua_strategy = BaguaStrategy(algorithm=algorithm) trainer = Trainer( default_root_dir=tmpdir, @@ -93,7 +95,7 @@ def test_bagua_async(tmpdir): def test_qadam(tmpdir): torch.manual_seed(13) model = TestModel4QAdam() - assert torch.norm(model.layer.weight) == 3.1995556354522705 + assert torch.norm(model.layer.weight) == 3.1995546818 bagua_strategy = BaguaStrategy(algorithm="qadam") trainer = Trainer( default_root_dir=tmpdir, @@ -104,4 +106,4 @@ def test_qadam(tmpdir): ) trainer.fit(model) trainer.test(model) - assert torch.norm(model.layer.weight) == 6.891299724578857 + assert torch.norm(model.layer.weight) == 6.8912987709 diff --git a/tests/torch_api/data_parallel/test_async_model_average.py b/tests/torch_api/data_parallel/test_async_model_average.py index 9ac3ebfd3..a37dfbcb9 100644 --- a/tests/torch_api/data_parallel/test_async_model_average.py +++ b/tests/torch_api/data_parallel/test_async_model_average.py @@ -1,14 +1,16 @@ +import logging +import os +import unittest + import torch import torch.nn as nn import torch.nn.functional as F -from tests.internal.common_utils import find_free_port -import unittest -import multiprocessing -import os import bagua.torch_api as bagua -from tests import skip_if_cuda_not_available -import logging + from bagua.torch_api.data_parallel import DistributedDataParallel as DDP +from tests.internal.multi_process_v2 import MultiProcessTestCase, skip_if_lt_x_gpu + +logger = logging.getLogger(__name__) class Net(nn.Module): @@ -26,24 +28,10 @@ def forward(self, x): return F.softmax(x, dim=1) -def run_model_wrapper(rank, env, fn, warmup_steps): - # initialize subprocess env - os.environ["WORLD_SIZE"] = env["WORLD_SIZE"] - os.environ["LOCAL_WORLD_SIZE"] = env["LOCAL_WORLD_SIZE"] - os.environ["MASTER_ADDR"] = env["MASTER_ADDR"] - os.environ["MASTER_PORT"] = env["MASTER_PORT"] - os.environ["BAGUA_SERVICE_PORT"] = env["BAGUA_SERVICE_PORT"] - os.environ["RANK"] = str(rank) - os.environ["LOCAL_RANK"] = str(rank) - - # init bagua distributed process group - torch.cuda.set_device(rank) - bagua.init_process_group() - - # construct model and optimizer, etc. +def create_model_and_optimizer(warmup_steps): + # construct model and optimizer model = Net().cuda() optimizer = torch.optim.SGD(model.parameters(), lr=0.01) - loss_fn = nn.MSELoss() # wrap model algorithm = bagua.algorithms.async_model_average.AsyncModelAverageAlgorithm( @@ -52,84 +40,62 @@ def run_model_wrapper(rank, env, fn, warmup_steps): ) ddp_model = DDP(model, optimizers=[optimizer], algorithm=algorithm) - fn(ddp_model, optimizer, loss_fn) + return ddp_model, optimizer -def train_epoch(epoch, model, optimizer, loss_fn): - logging.debug("Training epoch {}".format(epoch)) +def train_epoch(epoch, model, optimizer): + logger.debug("Training epoch {}".format(epoch)) for _ in range(10): data = torch.randn(4, 2).cuda() target = torch.randn(4, 4).cuda() optimizer.zero_grad() output = model(data) - loss = loss_fn(output, target) + loss = nn.MSELoss()(output, target) loss.backward() optimizer.step() -def run_epochs(model, optimizer, loss_fn): - for epoch in range(5): - train_epoch(epoch, model, optimizer, loss_fn) - model.bagua_algorithm.abort(model) +class TestAsyncModelAverage(MultiProcessTestCase): + def setUp(self): + super(TestAsyncModelAverage, self).setUp() + self._spawn_processes() + def tearDown(self): + super(TestAsyncModelAverage, self).tearDown() + try: + os.remove(self.file_name) + except OSError: + pass -def run_multiple_aborts(model, optimizer, loss_fn): - for epoch in range(10): - model.bagua_algorithm.resume(model) - model.bagua_algorithm.resume(model) - train_epoch(epoch, model, optimizer, loss_fn) - model.bagua_algorithm.abort(model) - model.bagua_algorithm.abort(model) - + @property + def world_size(self) -> int: + return 4 -class TestAsyncModelAverage(unittest.TestCase): - @skip_if_cuda_not_available() + @skip_if_lt_x_gpu(4) def test_algorithm(self): - nprocs = torch.cuda.device_count() - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - - mp = multiprocessing.get_context("spawn") - processes = [] - for i in range(nprocs): - p = mp.Process(target=run_model_wrapper, args=(i, env, run_epochs, 0)) - p.start() - processes.append(p) - - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) - - @skip_if_cuda_not_available() + self._init_bagua_distributed() + model, optimizer = create_model_and_optimizer(warmup_steps=0) + + for epoch in range(100): + train_epoch(epoch, model, optimizer) + model.bagua_algorithm.abort(model) + + @skip_if_lt_x_gpu(2) def test_multiple_aborts(self): - nprocs = torch.cuda.device_count() - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - - mp = multiprocessing.get_context("spawn") - processes = [] - for i in range(nprocs): - p = mp.Process( - target=run_model_wrapper, args=(i, env, run_multiple_aborts, 10) - ) - p.start() - processes.append(p) - - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) + self._init_bagua_distributed() + model, optimizer = create_model_and_optimizer(warmup_steps=10) + + for i in range(2): + model.bagua_algorithm.resume(model) + model.bagua_algorithm.abort(model) + model.bagua_algorithm.resume(model) + for epoch in range(100): + train_epoch(i * 100 + epoch, model, optimizer) + + model.bagua_algorithm.abort(model) + model.bagua_algorithm.abort(model) if __name__ == "__main__": diff --git a/tests/torch_api/data_parallel/test_bagua_ddp.py b/tests/torch_api/data_parallel/test_bagua_ddp.py index 4e7dbb55a..75de3ecbd 100644 --- a/tests/torch_api/data_parallel/test_bagua_ddp.py +++ b/tests/torch_api/data_parallel/test_bagua_ddp.py @@ -12,7 +12,7 @@ import torch.nn.functional as F from torch import nn -from tests.internal.torch.common_distributed import ( +from tests.internal.multi_process_v2 import ( MultiProcessTestCase, skip_if_lt_x_gpu, ) diff --git a/tests/torch_api/data_parallel/test_broadcast_state.py b/tests/torch_api/data_parallel/test_broadcast_state.py index 6d873913f..9961a8868 100644 --- a/tests/torch_api/data_parallel/test_broadcast_state.py +++ b/tests/torch_api/data_parallel/test_broadcast_state.py @@ -1,17 +1,18 @@ -import os -import unittest -import multiprocessing -import itertools import inspect -from multiprocessing import Manager +import itertools import logging -import bagua.torch_api as bagua -from tests.internal.common_utils import find_free_port -from tests import skip_if_cuda_not_available +import os +import unittest + import torch import torch.nn as nn import torch.nn.functional as F +import bagua.torch_api as bagua + from bagua.torch_api.data_parallel import DistributedDataParallel as DDP +from tests.internal.multi_process_v2 import MultiProcessTestCase, skip_if_lt_x_gpu + +logger = logging.getLogger(__name__) class Net(nn.Module): @@ -29,33 +30,12 @@ def forward(self, x): return F.softmax(x, dim=1) -def _init_bagua_env(rank, env): - # Set deterministic - torch.backends.cudnn.benchmark = False - torch.backends.cudnn.deterministic = True - torch.manual_seed(rank) - # Initialize subprocess env - os.environ["WORLD_SIZE"] = env["WORLD_SIZE"] - os.environ["LOCAL_WORLD_SIZE"] = env["LOCAL_WORLD_SIZE"] - os.environ["MASTER_ADDR"] = env["MASTER_ADDR"] - os.environ["MASTER_PORT"] = env["MASTER_PORT"] - os.environ["BAGUA_SERVICE_PORT"] = env["BAGUA_SERVICE_PORT"] - - os.environ["RANK"] = str(rank) - os.environ["LOCAL_RANK"] = str(rank) - - # Init bagua distributed process group - torch.cuda.set_device(rank) - bagua.init_process_group() - - def create_model_and_optimizer(opt_class, opt_param): - # C_in, C_out = 3, 10 model = Net().cuda() hyper_param = { k: v for k, v in opt_param.items() - if k in inspect.getargspec(opt_class.__init__).args + if k in inspect.signature(opt_class.__init__).parameters } optimizer = opt_class(model.parameters(), **hyper_param) return model, optimizer @@ -76,15 +56,14 @@ def get_optimizer_param_values(optimizer): return results -def run_bagua_broad(rank, nprocs, bagua_params, envs, opt_class, opt_hyper_param): - _init_bagua_env(rank, envs) +def run_bagua_broadcast(opt_class, opt_hyper_param): - model, bagua_optimizer = create_model_and_optimizer( - opt_class, opt_hyper_param - ) + logger.debug("Testing for {}, {}".format(opt_class, opt_hyper_param)) + + model, bagua_optimizer = create_model_and_optimizer(opt_class, opt_hyper_param) for epoch in range(5): - logging.debug("Training epoch {}".format(epoch)) + logger.debug("Training epoch {}".format(epoch)) for _ in range(10): data = torch.randn(4, 2).cuda() target = torch.randn(4, 4).cuda() @@ -107,19 +86,51 @@ def run_bagua_broad(rank, nprocs, bagua_params, envs, opt_class, opt_hyper_param ] optimizer_params = get_optimizer_param_values(bagua_optimizer) - # Put "model_params" in dimension 1, while "optimizer_params" in dimension 2. - bagua_params[rank][0].extend(model_params) - bagua_params[rank][1].extend(optimizer_params) + return model_params, optimizer_params + + +class TestBroadcastModule(MultiProcessTestCase): + def setUp(self): + super(TestBroadcastModule, self).setUp() + self._spawn_processes() + def tearDown(self): + super(TestBroadcastModule, self).tearDown() + try: + os.remove(self.file_name) + except OSError: + pass -class Test_Broadcast_Module(unittest.TestCase): - @skip_if_cuda_not_available() + def _check_result(self, test_id=None): + msg_rank0 = None + for i, process in enumerate(self.processes): + _, msg = self.pid_to_pipe[process.pid].recv() + + if i == 0: + msg_rank0 = msg + else: + self.assertEqual(msg, msg_rank0) + + @property + def world_size(self) -> int: + return 4 + + @skip_if_lt_x_gpu(4) def test_broadcast_module(self): - nprocs = torch.cuda.device_count() + # Set deterministic + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = True + torch.manual_seed(self.rank) + + self._init_bagua_distributed() optimizers = [ (optim_class.__name__, optim_class) - for optim_class in [torch.optim.SGD, torch.optim.Adam, torch.optim.Rprop] + for optim_class in [ + torch.optim.SGD, + # torch.optim.Adam, + torch.optim.Rprop, + ] ] optimizer_hyper_param = [ @@ -127,87 +138,19 @@ def test_broadcast_module(self): dict(lr=0.2), ] + bcast_params_list = [] for (opt_name, opt_class), opt_hyper_param in itertools.product( optimizers, optimizer_hyper_param ): - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - with Manager() as manager: - # For each rank, set a two dimensional list. One is used to save model_params, - # while the second save optimizer_params. - bagua_params = manager.list( - [[manager.list() for _ in range(2)] for _ in range(nprocs)] - ) - mp = multiprocessing.get_context("spawn") - processes = [] - for i in range(nprocs): - p = mp.Process( - target=run_bagua_broad, - args=( - i, - nprocs, - bagua_params, - env, - opt_class, - opt_hyper_param, - ), - ) - p.start() - processes.append(p) - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) - for rank in range(0, nprocs): - # Both "model_params" and "optimizer_params" are saved in (name, tensor/scalar) form, - # so we need to assert the two dimensional separately. - # This is compare the "model_params". - for i in range(len(bagua_params[0][0])): - # assert name - self.assertEqual( - bagua_params[0][0][i][0], - bagua_params[rank][0][i][0], - ) - # assert tensor - self.assertTrue( - torch.equal( - torch.tensor( - bagua_params[0][0][i][1], dtype=torch.float - ), - torch.tensor( - bagua_params[rank][0][i][1], dtype=torch.float - ), - ) - ) - - if len(bagua_params[0][1]) != 0: - for j in range(len(bagua_params[0][1])): - # assert name - self.assertEqual( - bagua_params[0][1][j][0], - bagua_params[rank][1][j][0], - ) - # assert tensor/scalar - if ( - bagua_params[0][1][j][1] is None - ): # this is for "torch.optim.sgd.SGD" and dict(lr=0.2) - continue - else: - self.assertTrue( - torch.equal( - torch.tensor( - bagua_params[0][1][j][1], dtype=torch.float - ), - torch.tensor( - bagua_params[rank][1][j][1], - dtype=torch.float, - ), - ) - ) + model_params, optimizer_params = run_bagua_broadcast( + opt_class, opt_hyper_param + ) + bcast_params_list.append( + {"model": model_params, "optimizer": optimizer_params} + ) + + bagua.barrier() + return bcast_params_list if __name__ == "__main__": diff --git a/tests/torch_api/data_parallel/test_c10d_common.py b/tests/torch_api/data_parallel/test_c10d_common.py index 41c249b4b..e53172b74 100644 --- a/tests/torch_api/data_parallel/test_c10d_common.py +++ b/tests/torch_api/data_parallel/test_c10d_common.py @@ -21,10 +21,14 @@ import torch.distributed as dist import torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook as powerSGD +import torch.distributed.rpc as rpc import torch.multiprocessing as mp import torch.nn.functional as F +import torch.testing._internal.common_utils as common from torch import nn from torch._six import string_classes + +import bagua.torch_api.data_parallel.functional as bagua_dist from bagua.torch_api.data_parallel import DistributedDataParallel from tests.internal.torch.common_distributed import ( MultiProcessTestCase, @@ -43,7 +47,6 @@ find_free_port, ) -import bagua.torch_api.data_parallel.functional as bagua_dist # load_tests from common_utils is used to automatically filter tests for # sharding on sandcastle. This line silences flake warnings @@ -169,19 +172,58 @@ def _create_store(self): return store def test_address_already_in_use(self): - if sys.platform == "win32": - err_msg_reg = "Only one usage of each socket address*" - else: - err_msg_reg = "^Address already in use$" + err_msg_reg = "^The server socket has failed to listen on any local " with self.assertRaisesRegex(RuntimeError, err_msg_reg): addr = DEFAULT_HOSTNAME - port = find_free_port() + port = common.find_free_port() # Use noqa to silence flake8. # Need to store in an unused variable here to ensure the first # object is not destroyed before the second object is created. - store1 = c10d.TCPStore(addr, port, 1, True) # noqa: F841 - store2 = c10d.TCPStore(addr, port, 1, True) # noqa: F841 + store1 = dist.TCPStore(addr, port, 1, True) # noqa: F841 + store2 = dist.TCPStore(addr, port, 1, True) # noqa: F841 + + @retry_on_connect_failures + def test_multitenancy(self): + addr = DEFAULT_HOSTNAME + port = common.find_free_port() + + # Use noqa to silence flake8. + # Need to store in an unused variable here to ensure the first + # object is not destroyed before the second object is created. + store1 = dist.TCPStore(addr, port, 1, True, multi_tenant=True) # type: ignore[call-arg] # noqa: F841 + store2 = dist.TCPStore(addr, port, 1, True, multi_tenant=True) # type: ignore[call-arg] # noqa: F841 + + @skip_if_win32() + @retry_on_connect_failures + def test_init_pg_and_rpc_with_same_socket(self): + addr = DEFAULT_HOSTNAME + port = common.find_free_port() + + os.environ["MASTER_ADDR"] = addr + os.environ["MASTER_PORT"] = str(port) + + # We internally use a multi-tenant TCP store. Both PG and RPC should successfully + # initialize even when using the same socket address. + + dist.init_process_group( + backend="gloo", + init_method="env://", + rank=0, + world_size=1, + ) + + backend_opts = rpc.TensorPipeRpcBackendOptions( + init_method=f"tcp://{addr}:{port}" + ) + rpc.init_rpc( + name="worker0", + rank=0, + world_size=1, + rpc_backend_options=backend_opts, + ) + + rpc.shutdown() # The TCPStore has 6 keys in test_set_get. It contains the 5 keys added by # the user and one additional key used for coordinate all the workers. @@ -215,49 +257,34 @@ def _test_numkeys_delkeys(self, fs): def test_numkeys_delkeys(self): self._test_numkeys_delkeys(self._create_store()) - def _create_client(self, index, addr, port, world_size, messages): - try: - client_store = dist.TCPStore(addr, port, world_size, timeout=timedelta(seconds=10)) - self.assertEqual("value".encode(), client_store.get("key")) - client_store.set(f"new_key{index}", f"new_value{index}") - self.assertEqual(f"next_value{index}".encode(), - client_store.compare_set(f"new_key{index}", f"new_value{index}", f"next_value{index}")) - except Exception: - messages.put('Caught exception: \n{}exiting process with exit code: {}' - .format(traceback.format_exc(), MultiProcessTestCase.TEST_ERROR_EXIT_CODE)) - sys.exit(MultiProcessTestCase.TEST_ERROR_EXIT_CODE) + def _create_client(self, index, addr, port, world_size): + client_store = dist.TCPStore( + addr, port, world_size=world_size, timeout=timedelta(seconds=10) + ) + self.assertEqual("value".encode(), client_store.get("key")) + client_store.set(f"new_key{index}", f"new_value{index}") + self.assertEqual( + f"next_value{index}".encode(), + client_store.compare_set( + f"new_key{index}", f"new_value{index}", f"next_value{index}" + ), + ) def _multi_worker_helper(self, world_size): addr = DEFAULT_HOSTNAME server_store = create_tcp_store(addr, world_size, wait_for_workers=False) server_store.set("key", "value") port = server_store.port - messages = mp.Queue() - processes = [] - num_proccesses = random.randint(3, 5) if world_size == -1 else world_size - for i in range(num_proccesses): - p = mp.Process(target=self._create_client, args=(i, addr, port, world_size, messages)) - processes.append(p) - p.start() - for p in processes: - p.join() - error_message = "" - while not messages.empty(): - error_message += messages.get() + "\n" - if any([p.exitcode != 0 for p in processes]): - raise RuntimeError(error_message) - - @unittest.skipIf( - IS_WINDOWS, "Skip test for windows due to multiprocessing library error when using windows spawn" - ) + + num_indices = world_size if world_size else 1 + for i in range(num_indices): + self._create_client(i, addr, port, world_size) + def test_multi_worker_with_fixed_world_size(self): self._multi_worker_helper(5) - @unittest.skipIf( - IS_WINDOWS, "Skip test for windows due to multiprocessing library error when using windows spawn" - ) def test_multi_worker_with_nonfixed_world_size(self): - self._multi_worker_helper(-1) + self._multi_worker_helper(None) class PrefixTCPStoreTest(TestCase, StoreTestBase): @@ -845,7 +872,10 @@ def test_single_limit_single_dtype(self): torch.empty([100], dtype=torch.float), torch.empty([50], dtype=torch.float), ] - result = dist._compute_bucket_assignment_by_size(tensors, [400]) + result, per_bucket_size_limits = dist._compute_bucket_assignment_by_size( + tensors, [400] + ) + self.assertTrue(all(size_lim == 400 for size_lim in per_bucket_size_limits)) self.assertEqual([[0], [1], [2], [3]], result) def test_single_limit_multi_dtype(self): @@ -857,7 +887,10 @@ def test_single_limit_multi_dtype(self): torch.empty([50], dtype=torch.float), torch.empty([25], dtype=torch.double), ] - result = dist._compute_bucket_assignment_by_size(tensors, [400]) + result, per_bucket_size_limits = dist._compute_bucket_assignment_by_size( + tensors, [400] + ) + self.assertTrue(all(size_lim == 400 for size_lim in per_bucket_size_limits)) self.assertEqual([[0, 2], [1, 3], [4], [5]], result) def test_multi_limit_single_dtype(self): @@ -867,7 +900,10 @@ def test_multi_limit_single_dtype(self): torch.empty([10], dtype=torch.float), torch.empty([10], dtype=torch.float), ] - result = dist._compute_bucket_assignment_by_size(tensors, [40, 80]) + result, per_bucket_size_limits = dist._compute_bucket_assignment_by_size( + tensors, [40, 80] + ) + self.assertEqual(per_bucket_size_limits, [40, 80, 80]) self.assertEqual([[0], [1, 2], [3]], result) def test_multi_limit_multi_dtype(self): @@ -879,8 +915,11 @@ def test_multi_limit_multi_dtype(self): torch.empty([50], dtype=torch.float), torch.empty([25], dtype=torch.double), ] - result = dist._compute_bucket_assignment_by_size(tensors, [200, 400]) + result, per_bucket_size_limits = dist._compute_bucket_assignment_by_size( + tensors, [200, 400] + ) self.assertEqual([[0], [1], [2, 4], [3, 5]], result) + self.assertEqual(per_bucket_size_limits, [200, 200, 400, 400]) class AbstractCommTest(object): @@ -1032,20 +1071,33 @@ def tearDown(self): except OSError: pass - def test_distributed_debug_mode(self): + def test_debug_level(self): + try: + del os.environ["TORCH_DISTRIBUTED_DEBUG"] + except KeyError: + pass + + dist.set_debug_level_from_env() # Default should be off - default_debug_mode = dist._get_debug_mode() - self.assertEqual(default_debug_mode, dist._DistributedDebugLevel.OFF) + default_debug_mode = dist.get_debug_level() + self.assertEqual(default_debug_mode, dist.DebugLevel.OFF) mapping = { - "OFF": dist._DistributedDebugLevel.OFF, - "INFO": dist._DistributedDebugLevel.INFO, - "DETAIL": dist._DistributedDebugLevel.DETAIL, + "OFF": dist.DebugLevel.OFF, + "off": dist.DebugLevel.OFF, + "oFf": dist.DebugLevel.OFF, + "INFO": dist.DebugLevel.INFO, + "info": dist.DebugLevel.INFO, + "INfO": dist.DebugLevel.INFO, + "DETAIL": dist.DebugLevel.DETAIL, + "detail": dist.DebugLevel.DETAIL, + "DeTaIl": dist.DebugLevel.DETAIL, } invalid_debug_modes = ["foo", 0, 1, -1] for mode in mapping.keys(): os.environ["TORCH_DISTRIBUTED_DEBUG"] = str(mode) - set_debug_mode = dist._get_debug_mode() + dist.set_debug_level_from_env() + set_debug_mode = dist.get_debug_level() self.assertEqual( set_debug_mode, mapping[mode], @@ -1054,8 +1106,10 @@ def test_distributed_debug_mode(self): for mode in invalid_debug_modes: os.environ["TORCH_DISTRIBUTED_DEBUG"] = str(mode) - with self.assertRaisesRegex(RuntimeError, "to be one of"): - dist._get_debug_mode() + with self.assertRaisesRegex( + RuntimeError, "The value of TORCH_DISTRIBUTED_DEBUG must" + ): + dist.set_debug_level_from_env() if __name__ == "__main__": diff --git a/tests/torch_api/data_parallel/test_gradient_allreduce.py b/tests/torch_api/data_parallel/test_gradient_allreduce.py index 5bcfd15ae..3c1367916 100644 --- a/tests/torch_api/data_parallel/test_gradient_allreduce.py +++ b/tests/torch_api/data_parallel/test_gradient_allreduce.py @@ -1,14 +1,17 @@ +import logging +import os +import pickle +import unittest + import torch import torch.nn as nn import torch.nn.functional as F -from tests.internal.common_utils import find_free_port -from tests.internal.multi_process import setup_bagua_env -import unittest -import multiprocessing -from bagua.torch_api.utils import flatten import bagua.torch_api as bagua -from tests import skip_if_cuda_not_available + from bagua.torch_api.data_parallel import DistributedDataParallel as DDP +from tests.internal.multi_process_v2 import MultiProcessTestCase, skip_if_lt_x_gpu + +logger = logging.getLogger(__name__) class Net(nn.Module): @@ -26,31 +29,14 @@ def forward(self, x): return F.softmax(x, dim=1) -def _init_bagua_env(rank, env): - # set deterministic - torch.backends.cudnn.benchmark = False - torch.backends.cudnn.deterministic = True - torch.manual_seed(rank) - # initialize subprocess env - setup_bagua_env(rank, env) - - -def run_model( - rank, - nprocs, - hierarchical, - results, - env, -): - _init_bagua_env(rank, env) - +def run_model(hierarchical): # construct model and optimizer, etc. model = Net().cuda() optimizer = torch.optim.SGD(model.parameters(), lr=0.01) loss_fn = nn.MSELoss() def run_epochs(num_epochs): - for epoch in range(num_epochs): + for _ in range(num_epochs): data = torch.randn(4, 2).cuda() target = torch.randn(4, 4).cuda() @@ -74,71 +60,58 @@ def run_epochs(num_epochs): run_epochs(10) - ret = results[rank] + flattened_weight = bagua.utils.flatten([param.data for param in model.parameters()]) + weight_norm = flattened_weight.norm().item() + return weight_norm - ret._weight.copy_(flatten([param.data for param in model.parameters()])) +class TestGradientAllReduce(MultiProcessTestCase): + def setUp(self): + super(TestGradientAllReduce, self).setUp() + self._spawn_processes() -class Result(object): - def __init__(self): - model = Net() - self._weight = flatten( - [torch.zeros_like(param.data) for param in model.parameters()] - ) - - -class TestGradientAllReduce(unittest.TestCase): - def run_test_locally( - self, - nprocs, - hierarchical, - ): - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - - mp = multiprocessing.get_context("spawn") - results = [Result() for _ in range(nprocs)] - processes = [] - for i in range(nprocs): - p = mp.Process( - target=run_model, - args=( - i, - nprocs, - hierarchical, - results, - env, - ), - ) - p.start() - processes.append(p) - - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) - - for rank in range(nprocs): - peer_rank = (rank + 1) % nprocs - # all workers have equal weights - self.assertTrue( - torch.equal( - results[rank]._weight, - results[peer_rank]._weight, - ) - ) - - @skip_if_cuda_not_available() + def tearDown(self): + super(TestGradientAllReduce, self).tearDown() + try: + os.remove(self.file_name) + except OSError: + pass + + def _check_result(self, test_id=None): + result = None + for i, process in enumerate(self.processes): + _, msg = self.pid_to_pipe[process.pid].recv() + weight_norm = pickle.loads(msg) + + logger.info("process {} result: {}".format(i, weight_norm)) + if result is None: + result = weight_norm + else: + assert result == weight_norm + + @property + def world_size(self) -> int: + return 4 + + @skip_if_lt_x_gpu(4) def test_algorithm(self): - nprocs = torch.cuda.device_count() - self.run_test_locally( - nprocs=nprocs, - hierarchical=False, - ) + # set deterministic + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = True + torch.manual_seed(self.rank) + + self._init_bagua_distributed() + return run_model(hierarchical=False) + + @skip_if_lt_x_gpu(4) + def test_algorithm_hierarchical(self): + # set deterministic + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = True + torch.manual_seed(self.rank) + + self._init_bagua_distributed() + return run_model(hierarchical=True) if __name__ == "__main__": diff --git a/tests/torch_api/test_async_model_average.py b/tests/torch_api/test_async_model_average.py index 6b2de46d2..538f50d9e 100644 --- a/tests/torch_api/test_async_model_average.py +++ b/tests/torch_api/test_async_model_average.py @@ -1,13 +1,15 @@ +import logging +import os +import unittest + import torch import torch.nn as nn import torch.nn.functional as F -from tests.internal.common_utils import find_free_port -import unittest -import multiprocessing -import os import bagua.torch_api as bagua -from tests import skip_if_cuda_not_available -import logging + +from tests.internal.multi_process_v2 import MultiProcessTestCase, skip_if_lt_x_gpu + +logger = logging.getLogger(__name__) class Net(nn.Module): @@ -25,24 +27,10 @@ def forward(self, x): return F.softmax(x, dim=1) -def run_model_wrapper(rank, env, fn, warmup_steps): - # initialize subprocess env - os.environ["WORLD_SIZE"] = env["WORLD_SIZE"] - os.environ["LOCAL_WORLD_SIZE"] = env["LOCAL_WORLD_SIZE"] - os.environ["MASTER_ADDR"] = env["MASTER_ADDR"] - os.environ["MASTER_PORT"] = env["MASTER_PORT"] - os.environ["BAGUA_SERVICE_PORT"] = env["BAGUA_SERVICE_PORT"] - os.environ["RANK"] = str(rank) - os.environ["LOCAL_RANK"] = str(rank) - - # init bagua distributed process group - torch.cuda.set_device(rank) - bagua.init_process_group() - - # construct model and optimizer, etc. +def create_model_and_optimizer(warmup_steps): + # construct model and optimizer model = Net().cuda() optimizer = torch.optim.SGD(model.parameters(), lr=0.01) - loss_fn = nn.MSELoss() # wrap model algorithm = bagua.algorithms.async_model_average.AsyncModelAverageAlgorithm( @@ -51,87 +39,62 @@ def run_model_wrapper(rank, env, fn, warmup_steps): ) model = model.with_bagua([optimizer], algorithm) - fn(model, optimizer, loss_fn) + return model, optimizer -def train_epoch(epoch, model, optimizer, loss_fn): - logging.debug("Training epoch {}".format(epoch)) +def train_epoch(epoch, model, optimizer): + logger.debug("Training epoch {}".format(epoch)) for _ in range(10): data = torch.randn(4, 2).cuda() target = torch.randn(4, 4).cuda() optimizer.zero_grad() output = model(data) - loss = loss_fn(output, target) + loss = nn.MSELoss()(output, target) loss.backward() optimizer.step() -def run_epochs(model, optimizer, loss_fn): - for epoch in range(100): - train_epoch(epoch, model, optimizer, loss_fn) - model.bagua_algorithm.abort(model) +class TestAsyncModelAverage(MultiProcessTestCase): + def setUp(self): + super(TestAsyncModelAverage, self).setUp() + self._spawn_processes() + def tearDown(self): + super(TestAsyncModelAverage, self).tearDown() + try: + os.remove(self.file_name) + except OSError: + pass -def run_multiple_aborts(model, optimizer, loss_fn): - for epoch in range(2): - model.bagua_algorithm.resume(model) - model.bagua_algorithm.abort(model) - model.bagua_algorithm.resume(model) - for _ in range(100): - train_epoch(epoch, model, optimizer, loss_fn) + @property + def world_size(self) -> int: + return 4 - model.bagua_algorithm.abort(model) - model.bagua_algorithm.abort(model) + @skip_if_lt_x_gpu(4) + def test_algorithm(self): + self._init_bagua_distributed() + model, optimizer = create_model_and_optimizer(warmup_steps=0) + for epoch in range(100): + train_epoch(epoch, model, optimizer) + model.bagua_algorithm.abort(model) -class TestAsyncModelAverage(unittest.TestCase): - @skip_if_cuda_not_available() - def test_algorithm(self): - nprocs = torch.cuda.device_count() - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - - mp = multiprocessing.get_context("spawn") - processes = [] - for i in range(nprocs): - p = mp.Process(target=run_model_wrapper, args=(i, env, run_epochs, 0)) - p.start() - processes.append(p) - - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) - - @skip_if_cuda_not_available() + @skip_if_lt_x_gpu(4) def test_multiple_aborts(self): - nprocs = torch.cuda.device_count() - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - - mp = multiprocessing.get_context("spawn") - processes = [] - for i in range(nprocs): - p = mp.Process( - target=run_model_wrapper, args=(i, env, run_multiple_aborts, 10) - ) - p.start() - processes.append(p) - - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) + self._init_bagua_distributed() + model, optimizer = create_model_and_optimizer(warmup_steps=10) + + for i in range(2): + model.bagua_algorithm.resume(model) + model.bagua_algorithm.abort(model) + model.bagua_algorithm.resume(model) + for epoch in range(100): + train_epoch(i * 100 + epoch, model, optimizer) + + model.bagua_algorithm.abort(model) + model.bagua_algorithm.abort(model) if __name__ == "__main__": diff --git a/tests/torch_api/test_broadcast_state.py b/tests/torch_api/test_broadcast_state.py index b6c6a8c00..94a466899 100644 --- a/tests/torch_api/test_broadcast_state.py +++ b/tests/torch_api/test_broadcast_state.py @@ -1,17 +1,17 @@ -import os -import unittest -import multiprocessing -import itertools import inspect -from multiprocessing import Manager -import time +import itertools import logging -import bagua.torch_api as bagua -from tests.internal.common_utils import find_free_port -from tests import skip_if_cuda_not_available +import os +import unittest + import torch import torch.nn as nn import torch.nn.functional as F +import bagua.torch_api as bagua + +from tests.internal.multi_process_v2 import MultiProcessTestCase, skip_if_lt_x_gpu + +logger = logging.getLogger(__name__) class Net(nn.Module): @@ -29,33 +29,12 @@ def forward(self, x): return F.softmax(x, dim=1) -def _init_bagua_env(rank, env): - # Set deterministic - torch.backends.cudnn.benchmark = False - torch.backends.cudnn.deterministic = True - torch.manual_seed(rank) - # Initialize subprocess env - os.environ["WORLD_SIZE"] = env["WORLD_SIZE"] - os.environ["LOCAL_WORLD_SIZE"] = env["LOCAL_WORLD_SIZE"] - os.environ["MASTER_ADDR"] = env["MASTER_ADDR"] - os.environ["MASTER_PORT"] = env["MASTER_PORT"] - os.environ["BAGUA_SERVICE_PORT"] = env["BAGUA_SERVICE_PORT"] - - os.environ["RANK"] = str(rank) - os.environ["LOCAL_RANK"] = str(rank) - - # Init bagua distributed process group - torch.cuda.set_device(rank) - bagua.init_process_group() - - def create_model_and_optimizer(opt_class, opt_param): - C_in, C_out = 3, 10 model = Net().cuda() hyper_param = { k: v for k, v in opt_param.items() - if k in inspect.getargspec(opt_class.__init__).args + if k in inspect.signature(opt_class.__init__).parameters } optimizer = opt_class(model.parameters(), **hyper_param) return model, optimizer @@ -76,15 +55,16 @@ def get_optimizer_param_values(optimizer): return results -def run_bagua_broad(rank, nprocs, bagua_params, envs, opt_class, opt_hyper_param): - _init_bagua_env(rank, envs) +def run_bagua_broadcast(opt_class, opt_hyper_param): + + logger.debug("Testing for {}, {}".format(opt_class, opt_hyper_param)) bagua_model, bagua_optimizer = create_model_and_optimizer( opt_class, opt_hyper_param ) for epoch in range(5): - logging.debug("Training epoch {}".format(epoch)) + logger.debug("Training epoch {}".format(epoch)) for _ in range(10): data = torch.randn(4, 2).cuda() target = torch.randn(4, 4).cuda() @@ -98,7 +78,7 @@ def run_bagua_broad(rank, nprocs, bagua_params, envs, opt_class, opt_hyper_param from bagua.torch_api.algorithms import gradient_allreduce - algorithm = gradient_allreduce.GradientAllReduceAlgorithm(hierarchical=True) + algorithm = gradient_allreduce.GradientAllReduceAlgorithm() bagua_model = bagua_model.with_bagua([bagua_optimizer], algorithm) model_params = [ @@ -107,19 +87,52 @@ def run_bagua_broad(rank, nprocs, bagua_params, envs, opt_class, opt_hyper_param ] optimizer_params = get_optimizer_param_values(bagua_optimizer) - # Put "model_params" in dimension 1, while "optimizer_params" in dimension 2. - bagua_params[rank][0].extend(model_params) - bagua_params[rank][1].extend(optimizer_params) + return model_params, optimizer_params + + +class TestBroadcastModule(MultiProcessTestCase): + def setUp(self): + super(TestBroadcastModule, self).setUp() + self._spawn_processes() + def tearDown(self): + super(TestBroadcastModule, self).tearDown() + try: + os.remove(self.file_name) + except OSError: + pass -class Test_Broadcast_Module(unittest.TestCase): - @skip_if_cuda_not_available() + def _check_result(self, test_id=None): + msg_rank0 = None + for i, process in enumerate(self.processes): + _, msg = self.pid_to_pipe[process.pid].recv() + + if i == 0: + msg_rank0 = msg + else: + self.assertEqual(msg, msg_rank0) + + @property + def world_size(self) -> int: + return 4 + + @skip_if_lt_x_gpu(4) def test_broadcast_module(self): - nprocs = torch.cuda.device_count() + # Set deterministic + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = True + torch.manual_seed(self.rank) + + self._init_bagua_distributed() optimizers = [ (optim_class.__name__, optim_class) - for optim_class in [torch.optim.SGD, torch.optim.Adam, torch.optim.Rprop] + for optim_class in [ + torch.optim.SGD, + # TODO: fix broadcast state for adam + # torch.optim.Adam, + torch.optim.Rprop, + ] ] optimizer_hyper_param = [ @@ -127,87 +140,20 @@ def test_broadcast_module(self): dict(lr=0.2), ] + bcast_params_list = [] for (opt_name, opt_class), opt_hyper_param in itertools.product( optimizers, optimizer_hyper_param ): - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - with Manager() as manager: - # For each rank, set a two dimensional list. One is used to save model_params, - # while the second save optimizer_params. - bagua_params = manager.list( - [[manager.list() for _ in range(2)] for _ in range(nprocs)] - ) - mp = multiprocessing.get_context("spawn") - processes = [] - for i in range(nprocs): - p = mp.Process( - target=run_bagua_broad, - args=( - i, - nprocs, - bagua_params, - env, - opt_class, - opt_hyper_param, - ), - ) - p.start() - processes.append(p) - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) - for rank in range(0, nprocs): - # Both "model_params" and "optimizer_params" are saved in (name, tensor/scalar) form, - # so we need to assert the two dimensional separately. - # This is compare the "model_params". - for i in range(len(bagua_params[0][0])): - # assert name - self.assertEqual( - bagua_params[0][0][i][0], - bagua_params[rank][0][i][0], - ) - # assert tensor - self.assertTrue( - torch.equal( - torch.tensor( - bagua_params[0][0][i][1], dtype=torch.float - ), - torch.tensor( - bagua_params[rank][0][i][1], dtype=torch.float - ), - ) - ) - - if len(bagua_params[0][1]) != 0: - for j in range(len(bagua_params[0][1])): - # assert name - self.assertEqual( - bagua_params[0][1][j][0], - bagua_params[rank][1][j][0], - ) - # assert tensor/scalar - if ( - bagua_params[0][1][j][1] is None - ): # this is for "torch.optim.sgd.SGD" and dict(lr=0.2) - continue - else: - self.assertTrue( - torch.equal( - torch.tensor( - bagua_params[0][1][j][1], dtype=torch.float - ), - torch.tensor( - bagua_params[rank][1][j][1], - dtype=torch.float, - ), - ) - ) + model_params, optimizer_params = run_bagua_broadcast( + opt_class, opt_hyper_param + ) + bcast_params_list.append( + {"model": model_params, "optimizer": optimizer_params} + ) + + # TODO: autotune server exit gracefully + bagua.barrier() + return bcast_params_list if __name__ == "__main__": diff --git a/tests/torch_api/test_gradient_allreduce.py b/tests/torch_api/test_gradient_allreduce.py index 452755a73..138ff3fb8 100644 --- a/tests/torch_api/test_gradient_allreduce.py +++ b/tests/torch_api/test_gradient_allreduce.py @@ -1,13 +1,16 @@ +import logging +import os +import pickle +import unittest + import torch import torch.nn as nn import torch.nn.functional as F -from tests.internal.common_utils import find_free_port -from tests.internal.multi_process import setup_bagua_env -import unittest -import multiprocessing -from bagua.torch_api.utils import flatten import bagua.torch_api as bagua -from tests import skip_if_cuda_not_available + +from tests.internal.multi_process_v2 import MultiProcessTestCase, skip_if_lt_x_gpu + +logger = logging.getLogger(__name__) class Net(nn.Module): @@ -25,31 +28,14 @@ def forward(self, x): return F.softmax(x, dim=1) -def _init_bagua_env(rank, env): - # set deterministic - torch.backends.cudnn.benchmark = False - torch.backends.cudnn.deterministic = True - torch.manual_seed(rank) - # initialize subprocess env - setup_bagua_env(rank, env) - - -def run_model( - rank, - nprocs, - hierarchical, - results, - env, -): - _init_bagua_env(rank, env) - +def run_model(hierarchical): # construct model and optimizer, etc. model = Net().cuda() optimizer = torch.optim.SGD(model.parameters(), lr=0.01) loss_fn = nn.MSELoss() def run_epochs(num_epochs): - for epoch in range(num_epochs): + for _ in range(num_epochs): data = torch.randn(4, 2).cuda() target = torch.randn(4, 4).cuda() @@ -72,71 +58,58 @@ def run_epochs(num_epochs): run_epochs(10) - ret = results[rank] + flattened_weight = bagua.utils.flatten([param.data for param in model.parameters()]) + weight_norm = flattened_weight.norm().item() + return weight_norm - ret._weight.copy_(flatten([param.data for param in model.parameters()])) +class TestGradientAllReduce(MultiProcessTestCase): + def setUp(self): + super(TestGradientAllReduce, self).setUp() + self._spawn_processes() -class Result(object): - def __init__(self): - model = Net() - self._weight = flatten( - [torch.zeros_like(param.data) for param in model.parameters()] - ) - - -class TestGradientAllReduce(unittest.TestCase): - def run_test_locally( - self, - nprocs, - hierarchical, - ): - env = { - "WORLD_SIZE": str(nprocs), - "LOCAL_WORLD_SIZE": str(nprocs), - "MASTER_ADDR": "127.0.0.1", - "MASTER_PORT": str(find_free_port(8000, 8100)), - "BAGUA_SERVICE_PORT": str(find_free_port(9000, 9100)), - } - - mp = multiprocessing.get_context("spawn") - results = [Result() for _ in range(nprocs)] - processes = [] - for i in range(nprocs): - p = mp.Process( - target=run_model, - args=( - i, - nprocs, - hierarchical, - results, - env, - ), - ) - p.start() - processes.append(p) - - for p in processes: - p.join(timeout=60) - self.assertTrue(p.exitcode == 0) - - for rank in range(nprocs): - peer_rank = (rank + 1) % nprocs - # all workers have equal weights - self.assertTrue( - torch.equal( - results[rank]._weight, - results[peer_rank]._weight, - ) - ) - - @skip_if_cuda_not_available() + def tearDown(self): + super(TestGradientAllReduce, self).tearDown() + try: + os.remove(self.file_name) + except OSError: + pass + + def _check_result(self, test_id=None): + result = None + for i, process in enumerate(self.processes): + _, msg = self.pid_to_pipe[process.pid].recv() + weight_norm = pickle.loads(msg) + + logger.info("process {} result: {}".format(i, weight_norm)) + if result is None: + result = weight_norm + else: + assert result == weight_norm + + @property + def world_size(self) -> int: + return 4 + + @skip_if_lt_x_gpu(4) def test_algorithm(self): - nprocs = torch.cuda.device_count() - self.run_test_locally( - nprocs=nprocs, - hierarchical=False, - ) + # set deterministic + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = True + torch.manual_seed(self.rank) + + self._init_bagua_distributed() + return run_model(hierarchical=False) + + @skip_if_lt_x_gpu(4) + def test_algorithm_hierarchical(self): + # set deterministic + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = True + torch.manual_seed(self.rank) + + self._init_bagua_distributed() + return run_model(hierarchical=True) if __name__ == "__main__": diff --git a/tests/torch_api/test_process_group.py b/tests/torch_api/test_process_group.py index 591c1234b..6bdc81442 100644 --- a/tests/torch_api/test_process_group.py +++ b/tests/torch_api/test_process_group.py @@ -78,7 +78,7 @@ def test_from_torch_group(self): self.run_test_locally(run_from_torch_group, nprocs, args={}, results=None) -from tests.internal.torch.common_distributed import ( # noqa: E402 +from tests.internal.multi_process_v2 import ( # noqa: E402 MultiProcessTestCase, skip_if_lt_x_gpu, )