|
| 1 | +# this version has been derived from @jeffra's gist: https://gist.github.com/jeffra/b5e80466b4c86be00ea3b6f130fb7a36 |
| 2 | +# which in turn is derived from https://github.com/NVIDIA/nccl-tests |
| 3 | +# |
| 4 | +# to run for 2 nodes: |
| 5 | +# python -m torch.distributed.run --nproc_per_node=2 all_reduce_bench.py |
| 6 | +# |
| 7 | +# the printed results are already n_gpu-agnostic (i.e. averaged for the world size) |
| 8 | + |
| 9 | +import argparse |
| 10 | +import fcntl |
| 11 | +import os |
| 12 | +import socket |
| 13 | +import time |
| 14 | +import torch |
| 15 | +import torch.distributed as dist |
| 16 | + |
| 17 | +TRIALS = 5 |
| 18 | + |
| 19 | +N = 500000 |
| 20 | +M = 2000 |
| 21 | + |
| 22 | +def printflock(*msgs): |
| 23 | + """ print """ |
| 24 | + with open(__file__, "r") as fh: |
| 25 | + fcntl.flock(fh, fcntl.LOCK_EX) |
| 26 | + try: |
| 27 | + print(*msgs) |
| 28 | + finally: |
| 29 | + fcntl.flock(fh, fcntl.LOCK_UN) |
| 30 | + |
| 31 | +def timed_allreduce(mat, id): |
| 32 | + pre = time.perf_counter() |
| 33 | + dist.all_reduce(mat) |
| 34 | + printflock(f"ignore me {int(mat[0][0])}") # required due to lazy evaluation |
| 35 | + duration = time.perf_counter() - pre |
| 36 | + tput = ((M*N*4*2)/duration)*8 # *2 is for send + receive, *8 for gigabits/second |
| 37 | + size = M * N * 4 # 4 is fp32 |
| 38 | + n = dist.get_world_size() |
| 39 | + busbw = (size / duration) * (2 * (n - 1) / n) * 8 |
| 40 | + printflock(f"{id}:\n", |
| 41 | + f"duration: {duration:.4f} sec\n", |
| 42 | + f"algo throughput: {tput:.4f} bps, {tput/1e9:.4f} Gbps\n", |
| 43 | + f"busbw: {busbw / 1e9:.4f} Gbps" |
| 44 | + ) |
| 45 | + |
| 46 | +def run(local_rank): |
| 47 | + hostname = socket.gethostname() |
| 48 | + id = f"{hostname}:{local_rank}" |
| 49 | + global_rank = dist.get_rank() |
| 50 | + |
| 51 | + printflock(f"{id} data size: {M*N*4/1e9} GB") |
| 52 | + mat = torch.rand(N, M, dtype=torch.float32).cuda(local_rank) |
| 53 | + |
| 54 | + for i in range(TRIALS): |
| 55 | + dist.barrier() |
| 56 | + if global_rank == 0: |
| 57 | + print(f"\n\n\n-----------trial-{i}----------------") |
| 58 | + timed_allreduce(mat, id) |
| 59 | + |
| 60 | +def init_processes(local_rank, fn, backend='nccl'): |
| 61 | + torch.cuda.set_device(local_rank) |
| 62 | + dist.init_process_group(backend) |
| 63 | + fn(local_rank) |
| 64 | + |
| 65 | + |
| 66 | +if __name__ == "__main__": |
| 67 | + rank = int(os.environ["LOCAL_RANK"]) |
| 68 | + printflock("local_rank: %d" % rank) |
| 69 | + init_processes(local_rank=rank, fn=run) |
0 commit comments