Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/microsoft/mscclpp into caio…
Browse files Browse the repository at this point in the history
…rocha/nccl_support_reducescatter
  • Loading branch information
caiomcbr committed Feb 4, 2025
2 parents e9e504e + 7f3b088 commit 1a70c7b
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 4 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/mscclpp-lang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ jobs:

steps:
- uses: actions/checkout@v4

- name: Set environment variable
run: echo "LD_LIBRARY_PATH=/usr/local/cuda/compat:/usr/local/cuda/lib64" >> $GITHUB_ENV

- name: Install mscclpp
run: |
CMAKE_ARGS="-DMSCCLPP_BYPASS_GPU_CHECK=ON -DMSCCLPP_USE_CUDA=ON" pip3 install .
Expand Down
1 change: 0 additions & 1 deletion docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ baseImageTable=(

declare -A extraLdPathTable
extraLdPathTable=(
["cuda11.8"]="/usr/local/cuda-11.8/lib64"
["cuda12.1"]="/usr/local/cuda-12.1/compat:/usr/local/cuda-12.1/lib64"
["cuda12.2"]="/usr/local/cuda-12.2/compat:/usr/local/cuda-12.2/lib64"
["cuda12.3"]="/usr/local/cuda-12.3/compat:/usr/local/cuda-12.3/lib64"
Expand Down
2 changes: 1 addition & 1 deletion docs/design/mscclpp-dsl.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ Packet APIs are used when user wants to use LL algorithm. The packet APIs are si


### Examples
We provide several examples demonstrating how to use the MSCCL++ DSL to write communication collective algorithms. For more details, please refer to the [examples](https://github.com/microsoft/mscclpp/tree/main/mscclpp-lang/python/examples) folder.
We provide several examples demonstrating how to use the MSCCL++ DSL to write communication collective algorithms. For more details, please refer to the [examples](https://github.com/microsoft/mscclpp/tree/main/python/examples) folder.
74 changes: 74 additions & 0 deletions python/examples/allgather_allpairs_multinodes_packets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import argparse
from mscclpp.language import *
from mscclpp.language.collectives import AllGather
from mscclpp.language.buffer import Buffer
from mscclpp.language.types import ChannelType, ReplicationPolicy


def allgather_multinodes_allpair(gpus, gpus_per_node, instances):
"""
Implements a multi-node allgather collective using an allpairs algorithm with MSCCL++ DSL.
@param gpus: Total number of GPUs
@param gpus_per_node: Number of GPUs per node
Steps:
1. Each rank sends a chunk to all other ranks' scratch buffers using packet format.
2. Copy the chunk from the scratch buffer to the output buffer using packet format.
"""
collective = AllGather(gpus, 1, True)
with MSCCLPPProgram(
"allgather_multinodes_allpair",
collective,
gpus,
instances,
protocol="LL",
replication_policy=ReplicationPolicy.interleaved,
num_threads_per_block=1024,
):
for g in range(gpus):
src_rank = g
c = chunk(src_rank, Buffer.input, 0, 1)
for peer in range(1, gpus):
dst_rank = (src_rank + peer) % gpus
tb = dst_rank if dst_rank < src_rank else dst_rank - 1
if src_rank // gpus_per_node == dst_rank // gpus_per_node:
c.put_packet(dst_rank, Buffer.scratch, index=src_rank, sendtb=tb)
else:
c.put_packet(
dst_rank,
Buffer.scratch,
index=src_rank,
sendtb=tb,
chan_type=ChannelType.port,
temp_buffer=Buffer.scratch,
temp_buffer_index=src_rank,
)

# Copying packet from local scratch buffer to local buffer
for g in range(gpus):
src_rank = g
src_offset = src_rank
for peer in range(1, gpus):
dst_rank = (g + peer) % gpus
tb = src_offset if src_offset < dst_rank else src_offset - 1
c = chunk(dst_rank, Buffer.scratch, src_offset, 1)
c.copy_packet(dst_rank, Buffer.output, src_offset, sendtb=tb + gpus - 1)

Json()
Check()


parser = argparse.ArgumentParser()
parser.add_argument("num_gpus", type=int, help="number of gpus")
parser.add_argument("gpus_per_node", type=int, help="number of gpus")
parser.add_argument("instances", type=int, help="number of instances")

args = parser.parse_args()

allgather_multinodes_allpair(
args.num_gpus,
args.gpus_per_node,
args.instances,
)
6 changes: 4 additions & 2 deletions python/mscclpp/language/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ def remove_empty_fields(d):
obj["connectedTo"] = [sorted(list(peers)) for peers in obj["connectedTo"]]
gpu_instance["channels"].append(obj)
gpu_instance["channels"] = list(filter(lambda x: x["type"] != "none", gpu_instance["channels"]))
gpu_instance["channels"] = sorted(gpu_instance["channels"], key=lambda x: (x["srcbuff"], x["dstbuff"]))
gpu_instance["channels"] = sorted(
gpu_instance["channels"], key=lambda x: (x["srcbuff"], x["dstbuff"], x["type"])
)

# render for GPU NVLS channels
for i, chan in enumerate(gpu_instance["channels"]):
Expand Down Expand Up @@ -502,7 +504,7 @@ def remove_empty_fields(d):
tb_channel_dict[(srcBuffer, dstBuffer, type)] = obj
tb_channels.append(obj)
tb_channels = filter(lambda x: x["type"] != "none", tb_channels)
tb_channels = sorted(tb_channels, key=lambda x: (x["srcbuff"], x["dstbuff"]))
tb_channels = sorted(tb_channels, key=lambda x: (x["srcbuff"], x["dstbuff"], x["type"]))
for op in tb.ops:
if op.tb == -1:
continue
Expand Down
4 changes: 4 additions & 0 deletions python/test/configs/mscclpp_lang_test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,9 @@
{
"filename": "allreduce_nvls.py",
"args": ["8", "2"]
},
{
"filename": "allgather_allpairs_multinodes_packets.py",
"args": ["16", "8", "1"]
}
]
3 changes: 3 additions & 0 deletions src/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <mscclpp/port_channel.hpp>
#include <set>

#include "debug.h"
#include "execution_kernel.hpp"
#include "execution_plan.hpp"

Expand Down Expand Up @@ -435,6 +436,8 @@ Executor::Executor(std::shared_ptr<Communicator> comm) : impl_(std::make_unique<
void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize,
[[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan,
cudaStream_t stream, PacketType packetType) {
INFO(MSCCLPP_EXECUTOR, "Starting execution with plan: %s, collective: %s", plan.name().c_str(),
plan.collective().c_str());
size_t sendMemRange, recvMemRange;
CUdeviceptr sendBasePtr, recvBasePtr;
MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendMemRange, (CUdeviceptr)sendbuff));
Expand Down
1 change: 1 addition & 0 deletions src/include/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ typedef enum {
MSCCLPP_ENV = 128,
MSCCLPP_ALLOC = 256,
MSCCLPP_CALL = 512,
MSCCLPP_EXECUTOR = 1024,
MSCCLPP_ALL = ~0
} mscclppDebugLogSubSys;

Expand Down

0 comments on commit 1a70c7b

Please sign in to comment.